This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit d9a3b05a56232f02ce007321e6385df5deb2dbba Author: Danny Cranmer <[email protected]> AuthorDate: Sat Dec 3 19:56:37 2022 +0000 [FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for Kinesis connector v2 --- .github/workflows/ci.yml | 4 +- .../pom.xml | 7 - .../pom.xml | 74 +++--- .../table/test/KinesisStreamsTableApiIT.java | 271 +++++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 28 +++ .../src/test/resources/send-orders.sql | 36 +++ flink-connector-aws-e2e-tests/pom.xml | 9 + 7 files changed, 374 insertions(+), 55 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 76962f3..f68d46d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,12 +59,12 @@ jobs: id: cache-flink with: path: ${{ env.FLINK_CACHE_DIR }} - key: ${{ inputs.flink_url }} + key: ${{ env.FLINK_URL }} - name: Download Flink binary working-directory: ${{ env.FLINK_CACHE_DIR }} if: steps.cache-flink.outputs.cache-hit != 'true' - run: wget -q -c ${{ inputs.flink_url }} -O - | tar -xz + run: wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz - name: Compile and test flink-connector-dynamodb run: | diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml index 8c9a1c3..b2e0878 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml @@ -48,12 +48,6 @@ <scope>test</scope> <type>test-jar</type> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-test-utils</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> @@ -124,7 +118,6 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.22.1</version> <configuration> <systemPropertyVariables> <!-- Required for Kinesalite. --> diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml similarity index 68% copy from flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml copy to flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml index 8c9a1c3..1a5d793 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml @@ -29,70 +29,53 @@ <modelVersion>4.0.0</modelVersion> - <artifactId>flink-connector-aws-kinesis-firehose-e2e-tests</artifactId> - <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Firehose e2e tests</name> + <artifactId>flink-connector-aws-kinesis-streams-e2e-tests</artifactId> + <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Streams e2e tests</name> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-aws-kinesis-firehose</artifactId> - <version>${project.version}</version> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> <scope>test</scope> - <type>test-jar</type> </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-aws-base</artifactId> <version>${project.version}</version> <scope>test</scope> <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>com.typesafe.netty</groupId> + <artifactId>netty-reactive-streams-http</artifactId> + </exclusion> + </exclusions> </dependency> + <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-test-utils</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>testcontainers</artifactId> + <artifactId>flink-connector-aws-kinesis-streams</artifactId> + <version>${project.version}</version> <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>com.typesafe.netty</groupId> + <artifactId>netty-reactive-streams-http</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>software.amazon.awssdk</groupId> - <artifactId>s3</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>software.amazon.awssdk</groupId> - <artifactId>iam</artifactId> <scope>test</scope> </dependency> </dependencies> + <build> <plugins> <plugin> @@ -111,9 +94,9 @@ <artifactItems> <artifactItem> <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId> + <artifactId>flink-sql-connector-aws-kinesis-streams</artifactId> <version>${project.version}</version> - <destFileName>sql-kinesis-firehose.jar</destFileName> + <destFileName>sql-kinesis-streams.jar</destFileName> <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> @@ -124,17 +107,16 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.22.1</version> <configuration> <systemPropertyVariables> <!-- Required for Kinesalite. --> <!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ --> <com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor> <com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking> - <org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>true - </org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor> - <org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>true - </org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking> + <org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>true + </org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor> + <org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>true + </org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking> </systemPropertyVariables> </configuration> </plugin> diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java new file mode 100644 index 0000000..6c0a944 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.table.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.rnorth.ducttape.ratelimits.RateLimiter; +import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */ +public class KinesisStreamsTableApiIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsTableApiIT.class); + + private static final String ORDERS_STREAM = "orders"; + private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; + private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000"; + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + + private SdkHttpClient httpClient; + private KinesisClient kinesisClient; + + private final Path sqlConnectorKinesisJar = + ResourceTestUtils.getResource(".*kinesis-streams.jar"); + private static final Network network = Network.newNetwork(); + + @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); + + @ClassRule + public static final KinesaliteContainer KINESALITE = + new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE)) + .withNetwork(network) + .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS); + + public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = + TestcontainersSettings.builder() + .environmentVariable("AWS_CBOR_DISABLE", "1") + .environmentVariable( + "FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") + .network(network) + .logger(LOGGER) + .dependsOn(KINESALITE) + .build(); + + public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + + @BeforeClass + public static void setupFlink() throws Exception { + FLINK.start(); + } + + @AfterClass + public static void stopFlink() { + FLINK.stop(); + } + + @Before + public void setUp() throws Exception { + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + httpClient = AWSServicesTestUtils.createHttpClient(); + kinesisClient = KINESALITE.createHostClient(httpClient); + prepareStream(ORDERS_STREAM); + } + + @After + public void teardown() { + System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); + AWSGeneralUtil.closeResources(httpClient, kinesisClient); + } + + @Test + public void testTableApiSourceAndSink() throws Exception { + executeSqlStatements(readSqlFile("send-orders.sql")); + List<Order> expected = + ImmutableList.of( + new Order("A", 10), + new Order("B", 12), + new Order("C", 14), + new Order("D", 16), + new Order("E", 18)); + // result order is not guaranteed + List<Order> result = readAllOrdersFromKinesis(); + Assertions.assertThat(result).containsAll(expected); + } + + private void prepareStream(String streamName) throws Exception { + final RateLimiter rateLimiter = + RateLimiterBuilder.newBuilder() + .withRate(1, SECONDS) + .withConstantThroughput() + .build(); + + kinesisClient.createStream( + CreateStreamRequest.builder().streamName(streamName).shardCount(1).build()); + + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1)); + while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) { + if (deadline.isOverdue()) { + throw new RuntimeException("Failed to create stream within time"); + } + } + } + + private boolean streamExists(final String streamName) { + try { + return kinesisClient + .describeStream( + DescribeStreamRequest.builder().streamName(streamName).build()) + .streamDescription() + .streamStatus() + == StreamStatus.ACTIVE; + } catch (Exception e) { + return false; + } + } + + private List<Order> readAllOrdersFromKinesis() throws Exception { + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10)); + List<Order> orders; + do { + orders = + readMessagesFromStream( + recordBytes -> fromJson(new String(recordBytes), Order.class)); + + } while (deadline.hasTimeLeft() && orders.size() < 5); + + return orders; + } + + private void executeSqlStatements(final List<String> sqlLines) throws Exception { + FLINK.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJars(sqlConnectorKinesisJar) + .build()); + } + + private List<String> readSqlFile(final String resourceName) throws Exception { + return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI())); + } + + private <T> T fromJson(final String json, final Class<T> type) { + try { + return OBJECT_MAPPER.readValue(json, type); + } catch (JsonProcessingException e) { + throw new RuntimeException("Test Failure.", e); + } + } + + private <T> List<T> readMessagesFromStream(Function<byte[], T> deserialiser) throws Exception { + String shardIterator = + kinesisClient + .getShardIterator( + GetShardIteratorRequest.builder() + .shardId(DEFAULT_FIRST_SHARD_NAME) + .shardIteratorType(ShardIteratorType.TRIM_HORIZON) + .streamName(KinesisStreamsTableApiIT.ORDERS_STREAM) + .build()) + .shardIterator(); + + List<Record> records = + kinesisClient + .getRecords( + GetRecordsRequest.builder().shardIterator(shardIterator).build()) + .records(); + List<T> messages = new ArrayList<>(); + records.forEach(record -> messages.add(deserialiser.apply(record.data().asByteArray()))); + return messages; + } + + /** POJO class for orders used by e2e test. */ + public static class Order { + private final String code; + private final int quantity; + + public Order( + @JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) { + this.code = code; + this.quantity = quantity; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + Order order = (Order) o; + return quantity == order.quantity && Objects.equals(code, order.code); + } + + @Override + public int hashCode() { + return Objects.hash(code, quantity); + } + + @Override + public String toString() { + return String.format("Order{code: %s, quantity: %d}", code, quantity); + } + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..835c2ec --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql new file mode 100644 index 0000000..29f2c88 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql @@ -0,0 +1,36 @@ +--/* +-- * Licensed to the Apache Software Foundation (ASF) under one +-- * or more contributor license agreements. See the NOTICE file +-- * distributed with this work for additional information +-- * regarding copyright ownership. The ASF licenses this file +-- * to you under the Apache License, Version 2.0 (the +-- * "License"); you may not use this file except in compliance +-- * with the License. You may obtain a copy of the License at +-- * +-- * http://www.apache.org/licenses/LICENSE-2.0 +-- * +-- * Unless required by applicable law or agreed to in writing, software +-- * distributed under the License is distributed on an "AS IS" BASIS, +-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- * See the License for the specific language governing permissions and +-- * limitations under the License. +-- */ + +CREATE TABLE orders ( + `code` STRING, + `quantity` BIGINT +) WITH ( + 'connector' = 'kinesis', + 'stream' = 'orders', + 'aws.region' = 'us-east-1', + 'aws.endpoint' = 'https://kinesalite:4567', + 'aws.credentials.provider' = 'BASIC', + 'aws.credentials.basic.accesskeyid' = 'access key', + 'aws.credentials.basic.secretkey' ='secret key', + 'aws.trust.all.certificates' = 'true', + 'sink.http-client.protocol.version' = 'HTTP1_1', + 'sink.batch.max-size' = '1', + 'format' = 'json' +); + +INSERT INTO orders VALUES ('A', 10),('B', 12),('C', 14),('D', 16),('E', 18); diff --git a/flink-connector-aws-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/pom.xml index 0835d85..b1266aa 100644 --- a/flink-connector-aws-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/pom.xml @@ -40,8 +40,17 @@ under the License. <modules> <module>flink-connector-aws-kinesis-firehose-e2e-tests</module> + <module>flink-connector-aws-kinesis-streams-e2e-tests</module> </modules> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <dependencyManagement> <!-- For dependency convergence --> <dependencies>
