This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57e3f03ccd719ed772c983ba335517d95f8f3e6a Author: Hang Ruan <[email protected]> AuthorDate: Tue Feb 15 20:15:21 2022 +0800 [FLINK-25289][tests] Introduce sink test suite in connector test framework This closes #18496. --- .../connector/kafka/sink/KafkaSinkITCase.java | 46 ++ .../kafka/sink/testutils/KafkaDataReader.java | 69 +++ .../sink/testutils/KafkaSinkExternalContext.java | 272 +++++++++ .../testutils/KafkaSinkExternalContextFactory.java | 53 ++ .../flink-end-to-end-tests-common-kafka/pom.xml | 10 +- .../flink/tests/util/kafka/KafkaSinkE2ECase.java | 87 +++ .../flink-connector-test-utils/pom.xml | 26 + .../environment/MiniClusterTestEnvironment.java | 22 +- .../sink/DataStreamSinkExternalContext.java | 10 - ...t.java => DataStreamSinkV1ExternalContext.java} | 29 +- .../sink/DataStreamSinkV2ExternalContext.java | 38 ++ .../testframe/source/FromElementsSource.java | 103 ++++ .../testframe/source/FromElementsSourceReader.java | 134 +++++ .../testframe/source/enumerator/NoOpEnumState.java | 22 + .../source/enumerator/NoOpEnumStateSerializer.java | 41 ++ .../source/enumerator/NoOpEnumerator.java | 50 ++ .../testframe/source/split/FromElementsSplit.java | 46 ++ .../source/split/FromElementsSplitSerializer.java | 55 ++ .../testframe/testsuites/SinkTestSuiteBase.java | 643 +++++++++++++++++++++ .../testframe/testsuites/SourceTestSuiteBase.java | 15 +- .../connector/testframe/utils/MetricQuerier.java | 43 +- 21 files changed, 1744 insertions(+), 70 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index 2a35112..4234f7a 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -31,12 +31,23 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -48,6 +59,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.TestUtils; import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava30.com.google.common.base.Joiner; @@ -69,11 +81,15 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.TestTemplate; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; import javax.annotation.Nullable; @@ -161,6 +177,36 @@ public class KafkaSinkITCase extends TestLogger { deleteTestTopic(topic); } + /** Integration test based on connector testing framework. */ + @Nested + class IntegrationTests extends SinkTestSuiteBase<String> { + // Defines test environment on Flink MiniCluster + @SuppressWarnings("unused") + @TestEnv + MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); + + // Defines external system + @TestExternalSystem + DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem.builder() + .fromContainer( + new KafkaContainer( + DockerImageName.parse(DockerImageVersions.KAFKA))) + .build(); + + @SuppressWarnings("unused") + @TestSemantics + CheckpointingMode[] semantics = + new CheckpointingMode[] { + CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE + }; + + @SuppressWarnings("unused") + @TestContext + KafkaSinkExternalContextFactory sinkContext = + new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList()); + } + @Test public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception { writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java new file mode 100644 index 0000000..0847cd4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.testutils; + +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +/** Kafka dataStream data reader. */ +public class KafkaDataReader implements ExternalSystemDataReader<String> { + private final KafkaConsumer<String, String> consumer; + + public KafkaDataReader(Properties properties, Collection<TopicPartition> partitions) { + this.consumer = new KafkaConsumer<>(properties); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + } + + @Override + public List<String> poll(Duration timeout) { + List<String> result = new LinkedList<>(); + ConsumerRecords<String, String> consumerRecords; + try { + consumerRecords = consumer.poll(timeout); + } catch (WakeupException we) { + return Collections.emptyList(); + } + Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); + while (iterator.hasNext()) { + result.add(iterator.next().value()); + } + return result; + } + + @Override + public void close() throws Exception { + if (consumer != null) { + consumer.close(); + } + } +} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java new file mode 100644 index 0000000..7c287d2 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.testutils; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; +import org.apache.flink.streaming.api.CheckpointingMode; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; + +/** A Kafka external context that will create only one topic and use partitions in that topic. */ +public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext<String> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class); + + private static final String TOPIC_NAME_PREFIX = "kafka-single-topic"; + private static final long DEFAULT_TIMEOUT = 30L; + private static final int RANDOM_STRING_MAX_LENGTH = 50; + private static final int NUM_RECORDS_UPPER_BOUND = 500; + private static final int NUM_RECORDS_LOWER_BOUND = 100; + private static final int DEFAULT_TRANSACTION_TIMEOUT_IN_MS = 900000; + + protected String bootstrapServers; + protected final String topicName; + + private final List<ExternalSystemDataReader<String>> readers = new ArrayList<>(); + + protected int numSplits = 0; + + private List<URL> connectorJarPaths; + + protected final AdminClient kafkaAdminClient; + + public KafkaSinkExternalContext(String bootstrapServers, List<URL> connectorJarPaths) { + this.bootstrapServers = bootstrapServers; + this.connectorJarPaths = connectorJarPaths; + this.topicName = + TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + kafkaAdminClient = createAdminClient(); + } + + private void createTopic(String topicName, int numPartitions, short replicationFactor) { + LOG.debug( + "Creating new Kafka topic {} with {} partitions and {} replicas", + topicName, + numPartitions, + replicationFactor); + NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor); + try { + kafkaAdminClient + .createTopics(Collections.singletonList(newTopic)) + .all() + .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e); + } + } + + private void deleteTopic(String topicName) { + LOG.debug("Deleting Kafka topic {}", topicName); + try { + kafkaAdminClient + .deleteTopics(Collections.singletonList(topicName)) + .all() + .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS); + } catch (Exception e) { + if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) { + throw new RuntimeException( + String.format("Cannot delete unknown Kafka topic '%s'", topicName), e); + } + } + } + + private AdminClient createAdminClient() { + final Properties config = new Properties(); + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return AdminClient.create(config); + } + + @Override + public Sink<String> createSink(TestingSinkSettings sinkSettings) { + if (!topicExists(topicName)) { + createTopic(topicName, 4, (short) 1); + } + + KafkaSinkBuilder<String> builder = KafkaSink.builder(); + final Properties properties = new Properties(); + properties.put( + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT_IN_MS); + builder.setBootstrapServers(bootstrapServers) + .setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode())) + .setTransactionalIdPrefix("testingFramework") + .setKafkaProducerConfig(properties) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(topicName) + .setValueSerializationSchema(new SimpleStringSchema()) + .build()); + return builder.build(); + } + + @Override + public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) { + LOG.info("Fetching information for topic: {}", topicName); + final Map<String, TopicDescription> topicMetadata = + getTopicMetadata(Arrays.asList(topicName)); + + Set<TopicPartition> subscribedPartitions = new HashSet<>(); + for (TopicDescription topic : topicMetadata.values()) { + for (TopicPartitionInfo partition : topic.partitions()) { + subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition())); + } + } + + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, + "flink-kafka-test" + subscribedPartitions.hashCode()); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + properties.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + if (EXACTLY_ONCE.equals(sinkSettings.getCheckpointingMode())) { + // default is read_uncommitted + properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + } + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + readers.add(new KafkaDataReader(properties, subscribedPartitions)); + return readers.get(readers.size() - 1); + } + + @Override + public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) { + Random random = new Random(seed); + List<String> randomStringRecords = new ArrayList<>(); + int recordNum = + random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND) + + NUM_RECORDS_LOWER_BOUND; + for (int i = 0; i < recordNum; i++) { + int stringLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1; + randomStringRecords.add(RandomStringUtils.random(stringLength, true, true)); + } + return randomStringRecords; + } + + protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) { + try { + return kafkaAdminClient.describeTopics(topics).all().get(); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to get metadata for topics %s.", topics), e); + } + } + + private boolean topicExists(String topic) { + try { + kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get(); + return true; + } catch (Exception e) { + return false; + } + } + + @Override + public void close() { + if (numSplits != 0) { + deleteTopic(topicName); + } + readers.stream() + .filter(Objects::nonNull) + .forEach( + reader -> { + try { + reader.close(); + } catch (Exception e) { + if (kafkaAdminClient != null) { + kafkaAdminClient.close(); + } + throw new RuntimeException("Cannot close split writer", e); + } + }); + readers.clear(); + if (kafkaAdminClient != null) { + kafkaAdminClient.close(); + } + } + + @Override + public String toString() { + return "Single-topic Kafka"; + } + + @Override + public List<URL> getConnectorJarPaths() { + return connectorJarPaths; + } + + @Override + public TypeInformation<String> getProducedType() { + return TypeInformation.of(String.class); + } + + private DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) { + switch (checkpointingMode) { + case EXACTLY_ONCE: + return DeliveryGuarantee.EXACTLY_ONCE; + case AT_LEAST_ONCE: + return DeliveryGuarantee.AT_LEAST_ONCE; + default: + throw new IllegalArgumentException( + String.format( + "Only exactly-once and al-least-once checkpointing mode are supported, but actual is %s.", + checkpointingMode)); + } + } +} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java new file mode 100644 index 0000000..b795854 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.testutils; + +import org.apache.flink.connector.testframe.external.ExternalContextFactory; + +import org.testcontainers.containers.KafkaContainer; + +import java.net.URL; +import java.util.List; +import java.util.stream.Collectors; + +/** Kafka sink external context factory. */ +public class KafkaSinkExternalContextFactory + implements ExternalContextFactory<KafkaSinkExternalContext> { + + private final KafkaContainer kafkaContainer; + private final List<URL> connectorJars; + + public KafkaSinkExternalContextFactory(KafkaContainer kafkaContainer, List<URL> connectorJars) { + this.kafkaContainer = kafkaContainer; + this.connectorJars = connectorJars; + } + + private String getBootstrapServer() { + final String internalEndpoints = + kafkaContainer.getNetworkAliases().stream() + .map(host -> String.join(":", host, Integer.toString(9092))) + .collect(Collectors.joining(",")); + return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints); + } + + @Override + public KafkaSinkExternalContext createExternalContext(String testName) { + return new KafkaSinkExternalContext(getBootstrapServer(), connectorJars); + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml index d37d55d..ec7bd0c 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml @@ -150,7 +150,6 @@ under the License. </exclusion> </exclusions> </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> @@ -225,6 +224,15 @@ under the License. <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <classifier>source</classifier> + <destFileName>flink-connector-testing.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java new file mode 100644 index 0000000..520491e --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.kafka; + +import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment; +import org.apache.flink.util.DockerImageVersions; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.Arrays; + +/** Kafka sink E2E test based on connector testing framework. */ +@SuppressWarnings("unused") +public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { + private static final String KAFKA_HOSTNAME = "kafka"; + + @TestSemantics + CheckpointingMode[] semantics = + new CheckpointingMode[] { + CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE + }; + + // Defines TestEnvironment + @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6); + + // Defines ConnectorExternalSystem + @TestExternalSystem + DefaultContainerizedExternalSystem<KafkaContainer> kafka = + DefaultContainerizedExternalSystem.builder() + .fromContainer( + new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) + .withNetworkAliases(KAFKA_HOSTNAME)) + .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager()) + .build(); + + // Defines 2 External context Factories, so test cases will be invoked twice using these two + // kinds of external contexts. + @TestContext + KafkaSinkExternalContextFactory contextFactory = + new KafkaSinkExternalContextFactory( + kafka.getContainer(), + Arrays.asList( + TestUtils.getResource("kafka-connector.jar") + .toAbsolutePath() + .toUri() + .toURL(), + TestUtils.getResource("kafka-clients.jar") + .toAbsolutePath() + .toUri() + .toURL(), + TestUtils.getResource("flink-connector-testing.jar") + .toAbsolutePath() + .toUri() + .toURL())); + + public KafkaSinkE2ECase() throws Exception {} +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/pom.xml b/flink-test-utils-parent/flink-connector-test-utils/pom.xml index 13aeea7..b3c1067 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-connector-test-utils/pom.xml @@ -95,4 +95,30 @@ <scope>compile</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>source</shadedClassifierName> + <artifactSet> + <includes> + <include>**/connector/testframe/source/**</include> + </includes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java index c5f29d1..0ecbe7f 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java @@ -30,17 +30,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -161,7 +160,7 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr } isStarted = false; this.miniCluster.after(); - deletePath(checkpointPath); + FileUtils.deleteDirectory(checkpointPath.toFile()); LOG.debug("MiniCluster has been tear down"); } @@ -180,21 +179,4 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr public String toString() { return "MiniCluster"; } - - /** Deletes the given path recursively. */ - public static void deletePath(Path path) throws IOException { - final List<File> files = - Files.walk(path) - .filter(p -> p != path) - .map(Path::toFile) - .collect(Collectors.toList()); - for (File file : files) { - if (file.isDirectory()) { - deletePath(file.toPath()); - } else { - file.delete(); - } - } - Files.deleteIfExists(path); - } } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java index 93d638c..25e638f 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java @@ -19,7 +19,6 @@ package org.apache.flink.connector.testframe.external.sink; import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.connector.testframe.external.ExternalContext; import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; @@ -34,15 +33,6 @@ import java.util.List; @Experimental public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> { - /** - * Create an instance of {@link Sink} satisfying given options. - * - * @param sinkSettings settings of the sink - * @throws UnsupportedOperationException if the provided option is not supported. - */ - Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings) - throws UnsupportedOperationException; - /** Create a reader for consuming data written to the external system by sink. */ ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings); diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java similarity index 53% copy from flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java copy to flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java index 93d638c..da36a22 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java @@ -20,20 +20,14 @@ package org.apache.flink.connector.testframe.external.sink; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.connector.testframe.external.ExternalContext; -import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; - -import java.util.List; /** - * External context for DataStream sinks. + * External context for DataStream sinks whose version is V1. * * @param <T> Type of elements before serialization by sink */ @Experimental -public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> { - +public interface DataStreamSinkV1ExternalContext<T> extends DataStreamSinkExternalContext<T> { /** * Create an instance of {@link Sink} satisfying given options. * @@ -42,23 +36,4 @@ public interface DataStreamSinkExternalContext<T> extends ExternalContext, Resul */ Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings) throws UnsupportedOperationException; - - /** Create a reader for consuming data written to the external system by sink. */ - ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings); - - /** - * Generate test data. - * - * <p>These test data will be sent to sink via a special source in Flink job, write to external - * system by sink, consume back via {@link ExternalSystemDataReader}, and make comparison with - * {@link T#equals(Object)} for validating correctness. - * - * <p>Make sure that the {@link T#equals(Object)} returns false when the records in different - * splits. - * - * @param sinkSettings settings of the sink - * @param seed Seed for generating random test data set. - * @return List of generated test data. - */ - List<T> generateTestData(TestingSinkSettings sinkSettings, long seed); } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java new file mode 100644 index 0000000..ce2cf07 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.external.sink; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.sink2.Sink; + +/** + * External context for DataStream sinks whose version is V2. + * + * @param <T> Type of elements before serialization by sink + */ +@Experimental +public interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> { + /** + * Create an instance of {@link Sink} satisfying given options. + * + * @param sinkSettings settings of the sink + * @throws UnsupportedOperationException if the provided option is not supported. + */ + Sink<T> createSink(TestingSinkSettings sinkSettings) throws UnsupportedOperationException; +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java new file mode 100644 index 0000000..8f98e2c --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumState; +import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumStateSerializer; +import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumerator; +import org.apache.flink.connector.testframe.source.split.FromElementsSplit; +import org.apache.flink.connector.testframe.source.split.FromElementsSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.util.List; + +/** + * A {@link Source} implementation that reads data from a list and stops reading at the fixed + * position. The source will wait until the checkpoint or savepoint triggered, the source is useful + * for connector tests. + * + * <p>Note: This parallelism of source must be 1. + */ +public class FromElementsSource<OUT> implements Source<OUT, FromElementsSplit, NoOpEnumState> { + private Boundedness boundedness; + + private List<OUT> elements; + + private Integer emittedElementsNum; + + public FromElementsSource(List<OUT> elements) { + this.elements = elements; + } + + public FromElementsSource( + Boundedness boundedness, List<OUT> elements, Integer emittedElementsNum) { + this(elements); + if (emittedElementsNum != null) { + Preconditions.checkState( + emittedElementsNum <= elements.size(), + String.format( + "The emittedElementsNum must not be larger than the elements list %d, but actual emittedElementsNum is %d", + elements.size(), emittedElementsNum)); + } + this.boundedness = boundedness; + this.emittedElementsNum = emittedElementsNum; + } + + @Override + public Boundedness getBoundedness() { + return boundedness == null ? Boundedness.BOUNDED : boundedness; + } + + @Override + public SourceReader<OUT, FromElementsSplit> createReader(SourceReaderContext readerContext) + throws Exception { + return new FromElementsSourceReader<>( + emittedElementsNum, elements, boundedness, readerContext); + } + + @Override + public SplitEnumerator<FromElementsSplit, NoOpEnumState> createEnumerator( + SplitEnumeratorContext<FromElementsSplit> enumContext) throws Exception { + return new NoOpEnumerator(); + } + + @Override + public SplitEnumerator<FromElementsSplit, NoOpEnumState> restoreEnumerator( + SplitEnumeratorContext<FromElementsSplit> enumContext, NoOpEnumState checkpoint) + throws Exception { + return new NoOpEnumerator(); + } + + @Override + public SimpleVersionedSerializer<FromElementsSplit> getSplitSerializer() { + return new FromElementsSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() { + return new NoOpEnumStateSerializer(); + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java new file mode 100644 index 0000000..e00942a --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.testframe.source.split.FromElementsSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.Counter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE; + +/** + * A {@link SourceReader} implementation that reads data from a list. If limitedNum is set, the + * reader will stop reading at the limitedNum position until the checkpoint or savepoint triggered. + */ +public class FromElementsSourceReader<T> implements SourceReader<T, FromElementsSplit> { + private static final Logger LOG = LoggerFactory.getLogger(FromElementsSourceReader.class); + + private volatile int emittedNum; + private volatile boolean isRunning = true; + + /** The context of this source reader. */ + private SourceReaderContext context; + + private Integer limitedNum; + private Boundedness boundedness; + private volatile boolean checkpointAtLimitedNum = false; + private List<T> elements; + private Counter numRecordInCounter; + + public FromElementsSourceReader( + Integer limitedNum, + List<T> elements, + Boundedness boundedness, + SourceReaderContext context) { + this.context = context; + this.emittedNum = 0; + this.elements = elements; + this.limitedNum = limitedNum; + this.boundedness = boundedness; + this.numRecordInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); + } + + @Override + public void start() {} + + @Override + public InputStatus pollNext(ReaderOutput<T> output) throws Exception { + if (isRunning && emittedNum < elements.size()) { + /* + * The reader will stop reading when it has emitted `successNum` records. + * If and only if a checkpoint whose `numElementsEmitted` is equal to `successNum` + * is completed, the reader will continue reading. + * + * When we disable the checkpointing and stop with a savepoint after + * receiving `successNum` records, the job starting with the savepoint + * will continue to read the records after the `successNum` records. + */ + if (limitedNum == null + || (limitedNum != null + && (emittedNum < limitedNum || checkpointAtLimitedNum))) { + output.collect(elements.get(emittedNum)); + emittedNum++; + numRecordInCounter.inc(); + } + return MORE_AVAILABLE; + } + + if (Boundedness.CONTINUOUS_UNBOUNDED.equals(boundedness)) { + return MORE_AVAILABLE; + } else { + return InputStatus.END_OF_INPUT; + } + } + + @Override + public List<FromElementsSplit> snapshotState(long checkpointId) { + if (limitedNum != null && !checkpointAtLimitedNum && emittedNum == limitedNum) { + checkpointAtLimitedNum = true; + LOG.info("checkpoint {} is the target checkpoint to be used.", checkpointId); + } + return Arrays.asList(new FromElementsSplit(emittedNum)); + } + + @Override + public CompletableFuture<Void> isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List<FromElementsSplit> splits) { + emittedNum = splits.get(0).getEmitNum(); + LOG.info("FromElementsSourceReader restores from {}.", emittedNum); + } + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void close() throws Exception { + isRunning = false; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + LOG.info("checkpoint {} finished.", checkpointId); + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java new file mode 100644 index 0000000..a2bb8cc --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source.enumerator; + +/** Mock enumerator state. */ +public class NoOpEnumState {} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java new file mode 100644 index 0000000..7be0e8d --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source.enumerator; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** Mock enumerator state seializer. */ +public class NoOpEnumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(NoOpEnumState obj) throws IOException { + return new byte[0]; + } + + @Override + public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException { + return new NoOpEnumState(); + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java new file mode 100644 index 0000000..b23bfa7 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.connector.testframe.source.split.FromElementsSplit; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** Mock enumerator. */ +public class NoOpEnumerator implements SplitEnumerator<FromElementsSplit, NoOpEnumState> { + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override + public void addSplitsBack(List<FromElementsSplit> splits, int subtaskId) {} + + @Override + public void addReader(int subtaskId) {} + + @Override + public NoOpEnumState snapshotState(long checkpointId) throws Exception { + return new NoOpEnumState(); + } + + @Override + public void close() throws IOException {} +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java new file mode 100644 index 0000000..d5ae8ad --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source.split; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.testframe.source.FromElementsSource; + +/** The split of the {@link FromElementsSource}. */ +public class FromElementsSplit implements SourceSplit { + public static final String SPLIT_ID = "fakeSplitId"; + + private int emitNum; + + public FromElementsSplit(int emitNum) { + this.emitNum = emitNum; + } + + public int getEmitNum() { + return emitNum; + } + + public void setEmitNum(int emitNum) { + this.emitNum = emitNum; + } + + @Override + public String splitId() { + return SPLIT_ID; + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java new file mode 100644 index 0000000..ab32917 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.source.split; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** The split serializer for the {@link FromElementsSource}. */ +public class FromElementsSplitSerializer implements SimpleVersionedSerializer<FromElementsSplit> { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(FromElementsSplit split) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + out.writeInt(split.getEmitNum()); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public FromElementsSplit deserialize(int version, byte[] serialized) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + int emitNum = in.readInt(); + return new FromElementsSplit(emitNum); + } + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java new file mode 100644 index 0000000..2c13e83 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.testsuites; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; +import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension; +import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider; +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connector.testframe.utils.MetricQuerier; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.commons.math3.util.Precision; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.opentest4j.TestAbortedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT; +import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails; +import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; +import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE; +import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Base class for sink test suite. + * + * <p>All cases should have well-descriptive JavaDoc, including: + * + * <ul> + * <li>What's the purpose of this case + * <li>Simple description of how this case works + * <li>Condition to fulfill in order to pass this case + * <li>Requirement of running this case + * </ul> + */ +@ExtendWith({ + ConnectorTestingExtension.class, + TestLoggerExtension.class, + TestCaseInvocationContextProvider.class +}) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Experimental +public abstract class SinkTestSuiteBase<T extends Comparable<T>> { + private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class); + + // ----------------------------- Basic test cases --------------------------------- + + /** + * Test DataStream connector sink. + * + * <p>The following tests will create a sink in the external system, generate a collection of + * test data and write them to this sink by the Flink Job. + * + * <p>In order to pass these tests, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantics. There's no requirement for records order. + */ + @TestTemplate + @DisplayName("Test data stream sink") + public void testBasicSink( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic); + final List<T> testRecords = generateTestData(sinkSettings, externalContext); + + // Build and execute Flink job + StreamExecutionEnvironment execEnv = + testEnv.createExecutionEnvironment( + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .build()); + execEnv.enableCheckpointing(50); + DataStream<T> dataStream = + execEnv.fromCollection(testRecords) + .name("sourceInSinkTest") + .setParallelism(1) + .returns(externalContext.getProducedType()); + tryCreateSink(dataStream, externalContext, sinkSettings) + .setParallelism(1) + .name("sinkInSinkTest"); + final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test"); + + waitForJobStatus( + jobClient, + Collections.singletonList(JobStatus.FINISHED), + Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); + + // Check test result + checkResultWithSemantic( + externalContext.createSinkDataReader(sinkSettings), testRecords, semantic); + } + + /** + * Test connector sink restart from a completed savepoint with the same parallelism. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then + * stop the job, restart the same job from the completed savepoint. After the job has been + * running, write the other part to the sink and compare the result. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test sink restarting from a savepoint") + public void testStartFromSavepoint( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + restartFromSavepoint(testEnv, externalContext, semantic, 2, 2); + } + + /** + * Test connector sink restart from a completed savepoint with a higher parallelism. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then + * stop the job, restart the same job from the completed savepoint with a higher parallelism 4. + * After the job has been running, write the other part to the sink and compare the result. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test sink restarting with a higher parallelism") + public void testScaleUp( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + restartFromSavepoint(testEnv, externalContext, semantic, 2, 4); + } + + /** + * Test connector sink restart from a completed savepoint with a lower parallelism. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then + * stop the job, restart the same job from the completed savepoint with a lower parallelism 2. + * After the job has been running, write the other part to the sink and compare the result. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test sink restarting with a lower parallelism") + public void testScaleDown( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + restartFromSavepoint(testEnv, externalContext, semantic, 4, 2); + } + + private void restartFromSavepoint( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic, + final int beforeParallelism, + final int afterParallelism) + throws Exception { + // Step 1: Preparation + TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic); + final StreamExecutionEnvironment execEnv = + testEnv.createExecutionEnvironment( + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .build()); + execEnv.setRestartStrategy(RestartStrategies.noRestart()); + + // Step 2: Generate test data + final List<T> testRecords = generateTestData(sinkSettings, externalContext); + + // Step 3: Build and execute Flink job + int numBeforeSuccess = testRecords.size() / 2; + DataStreamSource<T> source = + execEnv.fromSource( + new FromElementsSource<>( + Boundedness.CONTINUOUS_UNBOUNDED, + testRecords, + numBeforeSuccess), + WatermarkStrategy.noWatermarks(), + "beforeRestartSource") + .setParallelism(1); + + DataStream<T> dataStream = source.returns(externalContext.getProducedType()); + tryCreateSink(dataStream, externalContext, sinkSettings) + .name("Sink restart test") + .setParallelism(beforeParallelism); + + /** + * The job should stop after consume a specified number of records. In order to know when + * the specified number of records have been consumed, a collect sink is need to be watched. + */ + CollectResultIterator<T> iterator = addCollectSink(source); + final JobClient jobClient = execEnv.executeAsync("Restart Test"); + iterator.setJobClient(jobClient); + + // Step 4: Wait for the expected result and stop Flink job with a savepoint + final ExecutorService executorService = Executors.newCachedThreadPool(); + String savepointPath; + try { + waitForAllTaskRunning( + () -> + getJobDetails( + new RestClient(new Configuration(), executorService), + testEnv.getRestEndpoint(), + jobClient.getJobID()), + Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); + + waitExpectedSizeData(iterator, numBeforeSuccess); + + savepointPath = + jobClient + .stopWithSavepoint( + true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL) + .get(30, TimeUnit.SECONDS); + waitForJobStatus( + jobClient, + Collections.singletonList(JobStatus.FINISHED), + Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); + } catch (Exception e) { + executorService.shutdown(); + killJob(jobClient); + throw e; + } + + List<T> target = testRecords.subList(0, numBeforeSuccess); + checkResultWithSemantic( + externalContext.createSinkDataReader(sinkSettings), target, semantic); + + // Step 4: restart the Flink job with the savepoint + final StreamExecutionEnvironment restartEnv = + testEnv.createExecutionEnvironment( + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .setSavepointRestorePath(savepointPath) + .build()); + restartEnv.enableCheckpointing(50); + + DataStreamSource<T> restartSource = + restartEnv + .fromSource( + new FromElementsSource<>( + Boundedness.CONTINUOUS_UNBOUNDED, + testRecords, + testRecords.size()), + WatermarkStrategy.noWatermarks(), + "restartSource") + .setParallelism(1); + + DataStream<T> sinkStream = restartSource.returns(externalContext.getProducedType()); + tryCreateSink(sinkStream, externalContext, sinkSettings).setParallelism(afterParallelism); + addCollectSink(restartSource); + final JobClient restartJobClient = restartEnv.executeAsync("Restart Test"); + + try { + // Check the result + checkResultWithSemantic( + externalContext.createSinkDataReader(sinkSettings), testRecords, semantic); + } finally { + executorService.shutdown(); + killJob(restartJobClient); + iterator.close(); + } + } + + /** + * Test connector sink metrics. + * + * <p>This test will create a sink in the external system, generate test data and write them to + * the sink via a Flink job. Then read and compare the metrics. + * + * <p>Now test: numRecordsOut + */ + @TestTemplate + @DisplayName("Test sink metrics") + public void testMetrics( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic); + int parallelism = 1; + final List<T> testRecords = generateTestData(sinkSettings, externalContext); + + // make sure use different names when executes multi times + String sinkName = "metricTestSink" + testRecords.hashCode(); + final StreamExecutionEnvironment env = + testEnv.createExecutionEnvironment( + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .build()); + env.enableCheckpointing(50); + + DataStreamSource<T> source = + env.fromSource( + new FromElementsSource<>( + Boundedness.CONTINUOUS_UNBOUNDED, + testRecords, + testRecords.size()), + WatermarkStrategy.noWatermarks(), + "metricTestSource") + .setParallelism(1); + + DataStream<T> dataStream = source.returns(externalContext.getProducedType()); + tryCreateSink(dataStream, externalContext, sinkSettings) + .name(sinkName) + .setParallelism(parallelism); + final JobClient jobClient = env.executeAsync("Metrics Test"); + final MetricQuerier queryRestClient = new MetricQuerier(new Configuration()); + final ExecutorService executorService = Executors.newCachedThreadPool(); + try { + waitForAllTaskRunning( + () -> + getJobDetails( + new RestClient(new Configuration(), executorService), + testEnv.getRestEndpoint(), + jobClient.getJobID()), + Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); + + waitUntilCondition( + () -> { + // test metrics + try { + return compareSinkMetrics( + queryRestClient, + testEnv, + externalContext, + jobClient.getJobID(), + sinkName, + testRecords.size()); + } catch (Exception e) { + // skip failed assert try + return false; + } + }, + Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT)); + } finally { + // Clean up + executorService.shutdown(); + killJob(jobClient); + } + } + + // ----------------------------- Helper Functions --------------------------------- + + /** + * Generate a set of test records. + * + * @param testingSinkSettings sink settings + * @param externalContext External context + * @return Collection of generated test records + */ + protected List<T> generateTestData( + TestingSinkSettings testingSinkSettings, + DataStreamSinkExternalContext<T> externalContext) { + return externalContext.generateTestData( + testingSinkSettings, ThreadLocalRandom.current().nextLong()); + } + + /** + * Poll records from the sink. + * + * @param result Append records to which list + * @param reader The sink reader + * @param expected The expected list which help to stop polling + * @param retryTimes The retry times + * @param semantic The semantic + * @return Collection of records in the Sink + */ + private List<T> pollAndAppendResultData( + List<T> result, + ExternalSystemDataReader<T> reader, + List<T> expected, + int retryTimes, + CheckpointingMode semantic) { + long timeoutMs = 1000L; + int retryIndex = 0; + + while (retryIndex++ < retryTimes + && !checkGetEnoughRecordsWithSemantic(expected, result, semantic)) { + result.addAll(reader.poll(Duration.ofMillis(timeoutMs))); + } + return result; + } + + /** + * Check whether the polling should stop. + * + * @param expected The expected list which help to stop polling + * @param result The records that have been read + * @param semantic The semantic + * @return Whether the polling should stop + */ + private boolean checkGetEnoughRecordsWithSemantic( + List<T> expected, List<T> result, CheckpointingMode semantic) { + checkNotNull(expected); + checkNotNull(result); + if (EXACTLY_ONCE.equals(semantic)) { + return expected.size() <= result.size(); + } else if (AT_LEAST_ONCE.equals(semantic)) { + Set<Integer> matchedIndex = new HashSet<>(); + for (T record : expected) { + int before = matchedIndex.size(); + for (int i = 0; i < result.size(); i++) { + if (matchedIndex.contains(i)) { + continue; + } + if (record.equals(result.get(i))) { + matchedIndex.add(i); + break; + } + } + // if not find the record in the result + if (before == matchedIndex.size()) { + return false; + } + } + return true; + } + throw new IllegalStateException( + String.format("%s delivery guarantee doesn't support test.", semantic.name())); + } + + /** + * Compare the test data with actual data in given semantic. + * + * @param reader the data reader for the sink + * @param testData the test data + * @param semantic the supported semantic, see {@link CheckpointingMode} + */ + private void checkResultWithSemantic( + ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic) + throws Exception { + final ArrayList<T> result = new ArrayList<>(); + waitUntilCondition( + () -> { + pollAndAppendResultData(result, reader, testData, 30, semantic); + try { + CollectIteratorAssertions.assertThat(sort(result).iterator()) + .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic); + return true; + } catch (Throwable t) { + return false; + } + }, + Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT)); + } + + /** Compare the metrics. */ + private boolean compareSinkMetrics( + MetricQuerier metricQuerier, + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> context, + JobID jobId, + String sinkName, + long allRecordSize) + throws Exception { + double sumNumRecordsOut = + metricQuerier.getAggregatedMetricsByRestAPI( + testEnv.getRestEndpoint(), + jobId, + sinkName, + MetricNames.IO_NUM_RECORDS_OUT, + getSinkMetricFilter(context)); + return Precision.equals(allRecordSize, sumNumRecordsOut); + } + + /** Sort the list. */ + private List<T> sort(List<T> list) { + return list.stream().sorted().collect(Collectors.toList()); + } + + private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) { + return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build(); + } + + private void killJob(JobClient jobClient) throws Exception { + terminateJob(jobClient); + waitForJobStatus( + jobClient, + Collections.singletonList(JobStatus.CANCELED), + Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); + } + + private DataStreamSink<T> tryCreateSink( + DataStream<T> dataStream, + DataStreamSinkExternalContext<T> context, + TestingSinkSettings sinkSettings) { + try { + if (context instanceof DataStreamSinkV1ExternalContext) { + org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sinkV1 = + ((DataStreamSinkV1ExternalContext<T>) context).createSink(sinkSettings); + return dataStream.sinkTo(sinkV1); + } else if (context instanceof DataStreamSinkV2ExternalContext) { + Sink<T> sinkV2 = + ((DataStreamSinkV2ExternalContext<T>) context).createSink(sinkSettings); + return dataStream.sinkTo(sinkV2); + } else { + throw new IllegalArgumentException( + String.format( + "The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.", + context.getClass())); + } + } catch (UnsupportedOperationException e) { + // abort the test + throw new TestAbortedException("Cannot create a sink satisfying given options.", e); + } + } + + /** + * Return the filter used to filter the sink metric. + * + * <ul> + * <li>Sink v1: return null. + * <li>Sink v2: return the "Writer" prefix in the `SinkTransformationTranslator`. + * </ul> + */ + private String getSinkMetricFilter(DataStreamSinkExternalContext<T> context) { + if (context instanceof DataStreamSinkV1ExternalContext) { + return null; + } else if (context instanceof DataStreamSinkV2ExternalContext) { + // See class `SinkTransformationTranslator` + return "Writer"; + } else { + throw new IllegalArgumentException( + String.format("Get unexpected sink context: %s", context.getClass())); + } + } + + protected CollectResultIterator<T> addCollectSink(DataStream<T> stream) { + TypeSerializer<T> serializer = + stream.getType().createSerializer(stream.getExecutionConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory<T> factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator(); + CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory); + sink.name("Data stream collect sink"); + stream.getExecutionEnvironment().addOperator(sink.getTransformation()); + return new CollectResultIterator<>( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + stream.getExecutionEnvironment().getCheckpointConfig()); + } + + private void waitExpectedSizeData(CollectResultIterator<T> iterator, int targetNum) { + assertThat( + CompletableFuture.supplyAsync( + () -> { + int count = 0; + while (count < targetNum && iterator.hasNext()) { + iterator.next(); + count++; + } + if (count < targetNum) { + throw new IllegalStateException( + String.format( + "Fail to get %d records.", targetNum)); + } + return true; + })) + .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java index 992e12a..a90d05b 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java @@ -41,6 +41,7 @@ import org.apache.flink.connector.testframe.utils.MetricQuerier; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -70,11 +71,14 @@ import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT; +import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails; import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus; @@ -447,11 +451,14 @@ public abstract class SourceTestSuiteBase<T> { final JobClient jobClient = env.executeAsync("Metrics Test"); final MetricQuerier queryRestClient = new MetricQuerier(new Configuration()); + final ExecutorService executorService = Executors.newCachedThreadPool(); try { waitForAllTaskRunning( () -> - queryRestClient.getJobDetails( - testEnv.getRestEndpoint(), jobClient.getJobID()), + getJobDetails( + new RestClient(new Configuration(), executorService), + testEnv.getRestEndpoint(), + jobClient.getJobID()), Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); waitUntilCondition( @@ -472,6 +479,7 @@ public abstract class SourceTestSuiteBase<T> { Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT)); } finally { // Clean up + executorService.shutdown(); killJob(jobClient); } } @@ -775,7 +783,8 @@ public abstract class SourceTestSuiteBase<T> { testEnv.getRestEndpoint(), jobId, sourceName, - MetricNames.IO_NUM_RECORDS_IN); + MetricNames.IO_NUM_RECORDS_IN, + null); return Precision.equals(allRecordSize, sumNumRecordsIn); } diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java index 9fe13c1..d749132 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.MessagePathParameter; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; @@ -40,6 +41,9 @@ import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.util.Collection; import java.util.Iterator; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -56,16 +60,15 @@ public class MetricQuerier { restClient = new RestClient(configuration, Executors.newCachedThreadPool()); } - public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, JobID jobId) - throws Exception { + public static JobDetailsInfo getJobDetails( + RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception { String jmAddress = endpoint.getAddress(); int jmPort = endpoint.getPort(); final JobMessageParameters params = new JobMessageParameters(); params.jobPathParameter.resolve(jobId); - return restClient - .sendRequest( + return client.sendRequest( jmAddress, jmPort, JobDetailsHeaders.getInstance(), @@ -120,10 +123,11 @@ public class MetricQuerier { TestEnvironment.Endpoint endpoint, JobID jobId, String sourceOrSinkName, - String metricName) + String metricName, + String filter) throws Exception { // get job details, including the vertex id - JobDetailsInfo jobDetailsInfo = getJobDetails(endpoint, jobId); + JobDetailsInfo jobDetailsInfo = getJobDetails(restClient, endpoint, jobId); // get the vertex id for source/sink operator JobDetailsInfo.JobVertexDetailsInfo vertex = @@ -143,8 +147,8 @@ public class MetricQuerier { metricsResponseBody.getMetrics().stream() .filter( m -> - m.getId().endsWith(metricName) - && m.getId().contains(sourceOrSinkName)) + filterByMetricName( + m.getId(), sourceOrSinkName, metricName, filter)) .map(m -> m.getId()) .collect(Collectors.joining(",")); @@ -157,6 +161,27 @@ public class MetricQuerier { AggregatedMetricsResponseBody metricsResponse = getMetrics(endpoint, jobId, vertexId, queryParam); - return metricsResponse.getMetrics().iterator().next().getSum(); + + Collection<AggregatedMetric> metrics = metricsResponse.getMetrics(); + if (metrics == null || metrics.isEmpty()) { + throw new IllegalStateException( + String.format( + "Cannot find metric[%s] for operator [%s] with filter [%s].", + metricName, sourceOrSinkName, filter)); + } + return metrics.iterator().next().getSum(); + } + + private boolean filterByMetricName( + String metricName, + String sourceOrSinkName, + String targetMetricName, + @Nullable String filter) { + boolean filterByName = + metricName.endsWith(targetMetricName) && metricName.contains(sourceOrSinkName); + if (!StringUtils.isNullOrWhitespaceOnly(filter)) { + return filterByName && metricName.contains(filter); + } + return filterByName; } }
