This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 613a824c33d25925751bb875c8c54d9f50b7efe4 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Aug 12 17:37:00 2021 +0800 [FLINK-19554][connector/testing-framework] Basic abstractions of connector testing framework --- .../flink-connector-testing/README.md | 44 ++++++++++++ .../flink-connector-testing/pom.xml | 79 ++++++++++++++++++++++ .../flink/connectors/test/common/TestResource.java | 53 +++++++++++++++ .../common/environment/ClusterControllable.java | 33 +++++++++ .../test/common/environment/TestEnvironment.java | 35 ++++++++++ .../test/common/external/ExternalContext.java | 75 ++++++++++++++++++++ .../common/external/SourceSplitDataWriter.java | 34 ++++++++++ flink-test-utils-parent/pom.xml | 1 + 8 files changed, 354 insertions(+) diff --git a/flink-test-utils-parent/flink-connector-testing/README.md b/flink-test-utils-parent/flink-connector-testing/README.md new file mode 100644 index 0000000..e6d2e8f --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/README.md @@ -0,0 +1,44 @@ +# Flink Connector Testing Framework + +This module is dedicated to providing a unified and easy-to-use testing framework for all +Flink-managed and third-party connectors in Flink eco-system. + +## Overview + +The testing framework bases on JUnit 5 for test case running and resource lifecycle management. The +design goal of the framework is: connector developer only needs to provide instance of their +source/sink and it's related external system, then the framework will deal with tests with bundled +IT and end-to-end test cases. + +## Interfaces + +### Test Environment + +Test environment provides an execution environment of running the Flink job for testing (mini +cluster, local standalone cluster, Flink on Docker...), also a configuration for configuring the +environment and passing other required information into the test scenario. + +The framework provides MiniCluster and Flink on Docker out-of-box. You can also implement your own +test environment by implementing interface ```TestEnvironment```. + +### External System & Context + +You can define your own external system by implementing interface ```ExternalSystem```, and testing +framework will handle the lifecycle of external system automatically. + +External context defines how to interact with the external system, providing instance of source and +connecting to the external system, test data and a writer for producing data into external system. + +## Using Testing Framework in Your Connector + +Testing framework using JUnit 5 features for supporting test cases running. We provide a base class +```SourceTestSuiteBase``` with some basic test cases. You can start simply by extending this base class, +instantiate test environment, external system and context in your test class constructor, and +annotate them with ```@WithTestEnvironment```, ```@WithExternalSystem``` +and ```@WithExternalContextFactory```. + +You can refer to ```KafkaSourceE2ECase``` as an example for using the testing framework. + +Also, you can develop your own test cases in your test class. Just simply annotate your test cases +with ```@Case```, and testing framework will inject test environments and external systems into you +cases. diff --git a/flink-test-utils-parent/flink-connector-testing/pom.xml b/flink-test-utils-parent/flink-connector-testing/pom.xml new file mode 100644 index 0000000..3fab36b --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-parent</artifactId> + <version>1.14-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-testing_${scala.binary.version}</artifactId> + <name>Flink : Test utils : Testing Framework</name> + + <dependencies> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + </dependency> + </dependencies> + + <dependencyManagement> + <dependencies> + <!-- We pick an arbitrary version of net.java.dev.jna:jna to satisfy dependency + convergence for org.testcontainers:testcontainers which transitively depends on + two different versions.--> + <dependency> + <groupId>net.java.dev.jna</groupId> + <artifactId>jna</artifactId> + <version>5.5.0</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/TestResource.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/TestResource.java new file mode 100644 index 0000000..05d0174 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/TestResource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.connectors.test.common.environment.TestEnvironment; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; + +/** + * Basic abstractions for all resources used in connector testing framework, including {@link + * TestEnvironment} annotated by {@link TestEnv} and external system annotated by {@link + * ExternalSystem}. + * + * <p>Lifecycle of test resources will be managed by the framework. + */ +@Experimental +public interface TestResource { + + /** + * Start up the test resource. + * + * <p>The implementation of this method should be idempotent. + * + * @throws Exception if anything wrong when starting the resource + */ + void startUp() throws Exception; + + /** + * Tear down the test resource. + * + * <p>The test resource should be able to tear down even without a startup (could be a no-op). + * + * @throws Exception if anything wrong when tearing the resource down + */ + void tearDown() throws Exception; +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/ClusterControllable.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/ClusterControllable.java new file mode 100644 index 0000000..df68b42 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/ClusterControllable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.environment; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.core.execution.JobClient; + +/** Interface for triggering failover in a Flink cluster. */ +@Experimental +public interface ClusterControllable { + + void triggerJobManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception; + + void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception; + + void isolateNetwork(JobClient jobClient, Runnable afterFailAction) throws Exception; +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/TestEnvironment.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/TestEnvironment.java new file mode 100644 index 0000000..2c47da2 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/TestEnvironment.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.environment; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.connectors.test.common.TestResource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** Test environment for running Flink jobs. */ +@Experimental +public interface TestEnvironment extends TestResource { + + /** + * Create a new {@link StreamExecutionEnvironment} for configuring and executing the Flink job. + * + * @return An instance of execution environment + */ + StreamExecutionEnvironment createExecutionEnvironment(); +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java new file mode 100644 index 0000000..dacc3c1 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.external; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; + +import java.io.Serializable; +import java.util.Collection; + +/** + * Context of the test interacting with external system. + * + * <p>This context is responsible for providing: + * + * <ul> + * <li>Instance of sources connecting to external system + * <li>{@link SourceSplitDataWriter} for creating splits and writing test data into the created + * split + * <li>Test data to write into the external system + * </ul> + * + * @param <T> Type of elements after deserialization by source + */ +@Experimental +public interface ExternalContext<T> extends Serializable, AutoCloseable { + + /** + * Create a new instance of connector source implemented in {@link Source}. + * + * @return A new instance of Source + */ + Source<T, ?, ?> createSource(Boundedness boundedness); + + /** + * Create a new split in the external system and a data writer corresponding to the new split. + * + * @return A data writer for the created split. + */ + SourceSplitDataWriter<T> createSourceSplitDataWriter(); + + /** + * Generate test data. + * + * @param seed Seed for generating random test data set. + * @return Collection of generated test data. + */ + Collection<T> generateTestData(long seed); + + /** + * Factory for {@link ExternalContext}. + * + * @param <T> Type of elements after deserialization by source + */ + interface Factory<T> { + ExternalContext<T> createExternalContext(); + } +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/SourceSplitDataWriter.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/SourceSplitDataWriter.java new file mode 100644 index 0000000..247358b --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/SourceSplitDataWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.external; + +import org.apache.flink.annotation.Experimental; + +import java.util.Collection; + +/** + * A data writer for writing records into a {@link + * org.apache.flink.api.connector.source.SourceSplit} in the external system. + * + * @param <T> Type of writing records + */ +@Experimental +public interface SourceSplitDataWriter<T> extends AutoCloseable { + void writeRecords(Collection<T> records); +} diff --git a/flink-test-utils-parent/pom.xml b/flink-test-utils-parent/pom.xml index 801348e..2b4a4d7 100644 --- a/flink-test-utils-parent/pom.xml +++ b/flink-test-utils-parent/pom.xml @@ -39,6 +39,7 @@ under the License. <module>flink-test-utils-junit</module> <module>flink-test-utils</module> <module>flink-connector-test-utils</module> + <module>flink-connector-testing</module> </modules> </project>
