This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 9743a29 [FLINK-33073][Connectors/AWS] Implement end-to-end tests for
KinesisStreamsSink
9743a29 is described below
commit 9743a29ea53836ba5486548533adacc3a9b4db50
Author: Hong Liang Teoh <[email protected]>
AuthorDate: Tue Sep 12 00:26:56 2023 +0100
[FLINK-33073][Connectors/AWS] Implement end-to-end tests for
KinesisStreamsSink
---
.github/workflows/common.yml | 22 ++
.github/workflows/nightly.yml | 1 +
flink-connector-aws-e2e-tests/README.md | 9 +
.../pom.xml | 24 +-
.../kinesis/sink/KinesisStreamsSinkITCase.java | 359 +++++++++++++++++++++
.../kinesis/testutils/AWSEndToEndTestUtils.java | 78 +++++
.../testutils/AWSKinesisResourceManager.java | 120 +++++++
.../testutils/StaleResourceCleanupITCase.java | 42 +++
.../src/test/resources/junit-platform.properties | 4 +
.../src/test/resources/log4j2-test.properties | 3 +
flink-connector-aws-e2e-tests/pom.xml | 32 ++
11 files changed, 677 insertions(+), 17 deletions(-)
diff --git a/.github/workflows/common.yml b/.github/workflows/common.yml
index 237a779..e087cf4 100644
--- a/.github/workflows/common.yml
+++ b/.github/workflows/common.yml
@@ -41,6 +41,11 @@ on:
required: false
type: number
default: 50
+ run_aws_end_to_end_test:
+ description: "Whether to run the AWS end to end tests, requiring AWS
credentials."
+ required: false
+ type: boolean
+ default: false
jobs:
compile_and_test:
@@ -55,6 +60,8 @@ jobs:
FLINK_CACHE_DIR: "/tmp/cache/flink"
MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out"
MVN_VALIDATION_DIR: "/tmp/flink-validation-deployment"
+ FLINK_AWS_USER: ${{ vars.FLINK_AWS_USER }}
+ FLINK_AWS_PASSWORD: ${{ vars.FLINK_AWS_PASSWORD }}
steps:
- run: echo "Running CI pipeline for JDK version ${{ matrix.jdk }}"
@@ -112,6 +119,21 @@ jobs:
mvn clean
+ - name: Run AWS e2e tests
+ if: ${{ inputs.run_aws_end_to_end_test }}
+ run: |
+ set -o pipefail
+
+ cd flink-connector-aws-e2e-tests
+
+ mvn clean verify ${{ env.MVN_CONNECTION_OPTIONS }} \
+ -DaltDeploymentRepository=validation_repository::default::file:${{
env.MVN_VALIDATION_DIR }} \
+ -Dflink.version=${{ inputs.flink_version }} \
+ -Prun-aws-end-to-end-tests -DdistDir=${{ env.FLINK_CACHE_DIR
}}/flink-${{ inputs.flink_version }} \
+ | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
+
+ mvn clean
+
- name: Check licensing
run: |
mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 7084a92..2f4e365 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -31,3 +31,4 @@ jobs:
flink_version: ${{ matrix.flink }}
flink_url: https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink
}}-bin-scala_2.12.tgz
cache_flink_binary: false
+ run_aws_end_to_end_test: true
diff --git a/flink-connector-aws-e2e-tests/README.md
b/flink-connector-aws-e2e-tests/README.md
index 7d4696c..02cad0f 100644
--- a/flink-connector-aws-e2e-tests/README.md
+++ b/flink-connector-aws-e2e-tests/README.md
@@ -11,3 +11,12 @@ Example command to run end-to-end tests:
mvn clean verify -Prun-end-to-end-tests
-DdistDir=<path-to-dist>/flink-{FLINK_VERSION}/lib/flink-dist-{FLINK_VERSION}.jar
```
+### Running end-to-end tests which require AWS credentials
+
+This test suite includes some integration tests that require access to AWS
credentials. These tests run directly against AWS services.
+
+These tests can be run by configuring the required AWS credentials, and using
the `run-aws-end-to-end-tests` maven profile.
+Example command:
+```
+FLINK_AWS_USER=<AWS_ACCESS_KEY_ID> FLINK_AWS_PASSWORD=<AWS_SECRET_ACCESS_KEY>
mvn clean verify -Prun-aws-end-to-end-tests
-DdistDir=<path-to-dist>/flink-{FLINK_VERSION}/lib/flink-dist-{FLINK_VERSION}.jar
+```
\ No newline at end of file
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
index 3ce3764..5137e45 100644
---
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
@@ -41,6 +41,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-kinesis-streams</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-base</artifactId>
@@ -121,23 +128,6 @@
</artifactItems>
</configuration>
</plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <systemPropertyVariables>
- <!-- Required for Kinesalite. -->
- <!-- Including shaded and non-shaded conf to support
test running from Maven and IntelliJ -->
-
<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
-
<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
-
<org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>true
-
</org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>
-
<org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>true
-
</org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>
- </systemPropertyVariables>
- </configuration>
- </plugin>
</plugins>
</build>
</project>
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
new file mode 100644
index 0000000..9287d2e
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.kinesis.testutils.AWSEndToEndTestUtils;
+import org.apache.flink.connector.kinesis.testutils.AWSKinesisResourceManager;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import java.util.Properties;
+
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+
+/** End-to-end tests for Kinesis Data Streams Sink. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Tag("requires-aws-credentials")
+@Disabled
+class KinesisStreamsSinkITCase {
+
+ private static final String DEFAULT_FIRST_SHARD_NAME =
"shardId-000000000000";
+
+ private final SerializationSchema<String> serializationSchema = new
SimpleStringSchema();
+ private final PartitionKeyGenerator<String> partitionKeyGenerator =
+ element -> String.valueOf(element.hashCode());
+ private final PartitionKeyGenerator<String> longPartitionKeyGenerator =
element -> element;
+
+ private StreamExecutionEnvironment env;
+ private SdkHttpClient httpClient;
+ private KinesisClient kinesisClient;
+ private AWSKinesisResourceManager kinesisResourceManager;
+
+ @BeforeEach
+ void setUp() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ httpClient = AWSServicesTestUtils.createHttpClient();
+ kinesisClient =
+ AWSEndToEndTestUtils.createAwsSyncClient(httpClient,
KinesisClient.builder());
+ kinesisResourceManager = new AWSKinesisResourceManager(kinesisClient);
+ }
+
+ @AfterEach
+ void teardown() {
+ kinesisResourceManager.close();
+ AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+ }
+
+ @Test
+ void
elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached() throws
Exception {
+ new Scenario().runScenario();
+ }
+
+ @Test
+ void
elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive()
+ throws Exception {
+ new Scenario()
+ .withNumberOfElementsToSend(10)
+ .withMaxBatchSize(100)
+ .withExpectedElements(10)
+ .runScenario();
+ }
+
+ @Test
+ void veryLargeMessagesSucceedInBeingPersisted() throws Exception {
+ new Scenario()
+ .withNumberOfElementsToSend(5)
+ .withSizeOfMessageBytes(2500)
+ .withMaxBatchSize(10)
+ .withExpectedElements(5)
+ .runScenario();
+ }
+
+ @Test
+ void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted()
throws Exception {
+ new Scenario()
+ .withNumberOfElementsToSend(150)
+ .withSizeOfMessageBytes(2500)
+ .withBufferMaxTimeMS(2000)
+ .withMaxInflightReqs(10)
+ .withMaxBatchSize(20)
+ .withExpectedElements(150)
+ .runScenario();
+ }
+
+ @Test
+ void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() {
+ testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(true);
+ }
+
+ @Test
+ void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() {
+ testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(false);
+ }
+
+ private void
testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(
+ boolean failOnError) {
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+ .withSinkConnectionStreamName(
+ "flink-test-stream-not-exists")
+ .withFailOnError(failOnError)
+ .runScenario())
+ .havingCause()
+ .havingCause()
+ .havingCause()
+ .withMessageContaining("Stream flink-test-stream-not-exists
under account");
+ }
+
+ @Test
+ void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+ .withNumberOfElementsToSend(5)
+ .withSizeOfMessageBytes(2500)
+ .withExpectedElements(5)
+
.withSerializationSchema(serializationSchema)
+
.withPartitionKeyGenerator(longPartitionKeyGenerator)
+ .runScenario())
+ .havingCause()
+ .havingCause()
+ .withMessageContaining(
+ "Encountered an exception while persisting records,
not retrying due to {failOnError} being set.");
+ }
+
+ @Test
+ void badEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
+ badEndpointShouldResultInFailureWhenInFailOnErrorIs(true);
+ }
+
+ @Test
+ void badEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
+ badEndpointShouldResultInFailureWhenInFailOnErrorIs(false);
+ }
+
+ private void badEndpointShouldResultInFailureWhenInFailOnErrorIs(boolean
failOnError) {
+ Properties properties = getDefaultProperties();
+ properties.setProperty(AWS_ENDPOINT, "https://bad-endpoint-with-uri");
+ assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
+ failOnError,
+ properties,
+ "UnknownHostException when attempting to interact with a
service.");
+ }
+
+ @Test
+ void accessDeniedShouldFailJobWhenFailOnErrorIsOn() {
+ accessDeniedShouldFailJobWhenFailOnErrorIs(true);
+ }
+
+ @Test
+ void accessDeniedShouldFailJobWhenFailOnErrorIsOff() {
+ accessDeniedShouldFailJobWhenFailOnErrorIs(false);
+ }
+
+ private void accessDeniedShouldFailJobWhenFailOnErrorIs(boolean
failOnError) {
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+ .withSinkConnectionStreamName(
+
"no-permission-for-this-stream")
+ .withFailOnError(failOnError)
+ .runScenario())
+ .havingCause()
+ .havingCause()
+ .withMessageContaining(
+ "Encountered an exception while persisting records,
not retrying due to {failOnError} being set.")
+ .havingCause()
+ .withMessageContaining(
+ "is not authorized to perform: kinesis:PutRecords on
resource");
+ }
+
+ private void assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
+ boolean failOnError, Properties properties, String
expectedMessage) {
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+ .withFailOnError(failOnError)
+ .withProperties(properties)
+ .runScenario())
+ .withStackTraceContaining(expectedMessage);
+ }
+
+ private Properties getDefaultProperties() {
+ return AWSEndToEndTestUtils.createTestConfig();
+ }
+
+ private class Scenario {
+ private int numberOfElementsToSend = 50;
+ private int sizeOfMessageBytes = 25;
+ private int bufferMaxTimeMS = 1000;
+ private int maxInflightReqs = 1;
+ private int maxBatchSize = 50;
+ private int expectedElements = 50;
+ private boolean failOnError = false;
+ private String kinesisDataStreamARN = null;
+ private String sinkConnectionStreamName = null;
+ private String sinkConnectionStreamArn = null;
+ private SerializationSchema<String> serializationSchema =
+ KinesisStreamsSinkITCase.this.serializationSchema;
+ private PartitionKeyGenerator<String> partitionKeyGenerator =
+ KinesisStreamsSinkITCase.this.partitionKeyGenerator;
+ private Properties properties =
KinesisStreamsSinkITCase.this.getDefaultProperties();
+
+ public void runScenario() throws Exception {
+ kinesisDataStreamARN =
kinesisResourceManager.createKinesisDataStream();
+ if (sinkConnectionStreamName == null) {
+ sinkConnectionStreamArn = kinesisDataStreamARN;
+ }
+
+ DataStream<String> stream =
+ env.addSource(
+ new DataGeneratorSource<>(
+
RandomGenerator.stringGenerator(sizeOfMessageBytes),
+ 100,
+ (long) numberOfElementsToSend))
+ .returns(String.class);
+
+ KinesisStreamsSink<String> kdsSink =
+ KinesisStreamsSink.<String>builder()
+ .setSerializationSchema(serializationSchema)
+ .setPartitionKeyGenerator(partitionKeyGenerator)
+ .setMaxTimeInBufferMS(bufferMaxTimeMS)
+ .setMaxInFlightRequests(maxInflightReqs)
+ .setMaxBatchSize(maxBatchSize)
+ .setFailOnError(failOnError)
+ .setMaxBufferedRequests(1000)
+ .setStreamName(sinkConnectionStreamName)
+ .setStreamArn(sinkConnectionStreamArn)
+ .setKinesisClientProperties(properties)
+ .setFailOnError(true)
+ .build();
+
+ stream.sinkTo(kdsSink);
+
+ env.execute("KDS Async Sink Example Program");
+
+ String shardIterator =
+ kinesisClient
+ .getShardIterator(
+ GetShardIteratorRequest.builder()
+ .shardId(DEFAULT_FIRST_SHARD_NAME)
+
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
+ .streamARN(kinesisDataStreamARN)
+ .build())
+ .shardIterator();
+
+ Assertions.assertThat(
+ kinesisClient
+ .getRecords(
+ GetRecordsRequest.builder()
+
.shardIterator(shardIterator)
+ .build())
+ .records()
+ .size())
+ .isEqualTo(expectedElements);
+ }
+
+ public Scenario withNumberOfElementsToSend(int numberOfElementsToSend)
{
+ this.numberOfElementsToSend = numberOfElementsToSend;
+ return this;
+ }
+
+ public Scenario withSizeOfMessageBytes(int sizeOfMessageBytes) {
+ this.sizeOfMessageBytes = sizeOfMessageBytes;
+ return this;
+ }
+
+ public Scenario withBufferMaxTimeMS(int bufferMaxTimeMS) {
+ this.bufferMaxTimeMS = bufferMaxTimeMS;
+ return this;
+ }
+
+ public Scenario withMaxInflightReqs(int maxInflightReqs) {
+ this.maxInflightReqs = maxInflightReqs;
+ return this;
+ }
+
+ public Scenario withMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ public Scenario withExpectedElements(int expectedElements) {
+ this.expectedElements = expectedElements;
+ return this;
+ }
+
+ public Scenario withFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ return this;
+ }
+
+ public Scenario withSinkConnectionStreamName(String
sinkConnectionStreamName) {
+ this.sinkConnectionStreamName = sinkConnectionStreamName;
+ return this;
+ }
+
+ public Scenario withSerializationSchema(SerializationSchema<String>
serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ public Scenario withPartitionKeyGenerator(
+ PartitionKeyGenerator<String> partitionKeyGenerator) {
+ this.partitionKeyGenerator = partitionKeyGenerator;
+ return this;
+ }
+
+ public Scenario withProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+ }
+}
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSEndToEndTestUtils.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSEndToEndTestUtils.java
new file mode 100644
index 0000000..a8d2824
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSEndToEndTestUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import java.util.Properties;
+
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+
+/** Utility class for configuring end-to-end tests. */
+public class AWSEndToEndTestUtils {
+ private static final Region TEST_REGION = Region.AP_SOUTHEAST_1;
+
+ private AWSEndToEndTestUtils() {
+ // Private constructor to prevent initialization.
+ }
+
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsSyncClientBuilder<? extends T, S> &
AwsClientBuilder<? extends T, S>>
+ S createAwsSyncClient(SdkHttpClient httpClient, T clientBuilder) {
+ Properties config = createTestConfig();
+ return clientBuilder
+ .httpClient(httpClient)
+
.credentialsProvider(AWSGeneralUtil.getCredentialsProvider(config))
+ .region(AWSGeneralUtil.getRegion(config))
+ .build();
+ }
+
+ public static Properties createTestConfig() {
+ Properties config = new Properties();
+ config.setProperty(AWS_REGION, TEST_REGION.toString());
+ configureTestCredentials(config);
+ return config;
+ }
+
+ private static void configureTestCredentials(Properties config) {
+ Preconditions.checkNotNull(
+ System.getenv("FLINK_AWS_USER"),
+ "FLINK_AWS_USER not configured for end to end test.");
+ Preconditions.checkNotNull(
+ System.getenv("FLINK_AWS_PASSWORD"),
+ "FLINK_AWS_PASSWORD not configured for end to end test.");
+ config.setProperty(
+ AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER),
+ System.getenv("FLINK_AWS_USER"));
+ config.setProperty(
+ AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER),
+ System.getenv("FLINK_AWS_PASSWORD"));
+ }
+}
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSKinesisResourceManager.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSKinesisResourceManager.java
new file mode 100644
index 0000000..5e3e621
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSKinesisResourceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.testutils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.StreamMode;
+import software.amazon.awssdk.services.kinesis.model.StreamModeDetails;
+import software.amazon.awssdk.services.kinesis.model.StreamSummary;
+import software.amazon.awssdk.services.kinesis.paginators.ListStreamsIterable;
+import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+/** Resource manager to create and clean up Kinesis Data Stream resources used
for each test. */
+public class AWSKinesisResourceManager implements AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AWSKinesisResourceManager.class);
+
+ private static final String KINESIS_DATA_STREAMS_NAME_PREFIX =
"flink-test-";
+ private static final int DEFAULT_SHARD_COUNT = 1;
+ private final KinesisClient kinesisClient;
+ private final KinesisWaiter kinesisWaiter;
+ private final Set<String> kinesisDataStreamARNs = new HashSet<>();
+
+ public AWSKinesisResourceManager(KinesisClient kinesisClient) {
+ this.kinesisClient = kinesisClient;
+ this.kinesisWaiter =
KinesisWaiter.builder().client(kinesisClient).build();
+ }
+
+ public String createKinesisDataStream() {
+ String streamName = KINESIS_DATA_STREAMS_NAME_PREFIX +
UUID.randomUUID();
+ LOGGER.info("Creating Kinesis Data Stream with name {}.", streamName);
+ kinesisClient.createStream(
+ CreateStreamRequest.builder()
+ .streamName(streamName)
+ .shardCount(DEFAULT_SHARD_COUNT)
+ .streamModeDetails(
+ StreamModeDetails.builder()
+ .streamMode(StreamMode.PROVISIONED)
+ .build())
+ .build());
+ DescribeStreamResponse response =
+ kinesisClient.describeStream(
+
DescribeStreamRequest.builder().streamName(streamName).build());
+ String streamARN = response.streamDescription().streamARN();
+ kinesisDataStreamARNs.add(streamARN);
+ LOGGER.info("Successfully created Kinesis Data Stream with ARN {}.",
streamARN);
+
+ LOGGER.info("Waiting until Kinesis Data Stream with ARN {} is
ACTIVE.", streamARN);
+ kinesisWaiter.waitUntilStreamExists(
+ DescribeStreamRequest.builder().streamARN(streamARN).build());
+ return streamARN;
+ }
+
+ public void close() {
+ kinesisDataStreamARNs.forEach(this::deleteStream);
+ }
+
+ public void cleanUpStaleKinesisDataStreams() {
+ // Delete all test streams created at least 2 days ago.
+ Instant twoDaysAgo = Instant.now().minus(2, ChronoUnit.DAYS);
+
+ LOGGER.info(
+ "Listing all Kinesis Data Stream with prefix {} created more
than 2 days ago.",
+ KINESIS_DATA_STREAMS_NAME_PREFIX);
+ ListStreamsIterable listStreamsResponses =
kinesisClient.listStreamsPaginator();
+ listStreamsResponses.forEach(
+ listStreamsResponse ->
+ listStreamsResponse.streamSummaries().stream()
+ .filter(
+ streamSummary ->
+ streamSummary
+ .streamName()
+ .startsWith(
+
KINESIS_DATA_STREAMS_NAME_PREFIX))
+ .filter(
+ streamSummary ->
+ streamSummary
+
.streamCreationTimestamp()
+ .isBefore(twoDaysAgo))
+ .map(StreamSummary::streamARN)
+ .forEach(
+ streamARN -> {
+ LOGGER.warn(
+ "Found stale Kinesis Data
Stream with ARN {}. Deleting stream",
+ streamARN);
+ deleteStream(streamARN);
+ }));
+ }
+
+ private void deleteStream(String streamARN) {
+ LOGGER.info("Deleting Kinesis Data Stream with ARN {}.", streamARN);
+
kinesisClient.deleteStream(DeleteStreamRequest.builder().streamARN(streamARN).build());
+ }
+}
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/StaleResourceCleanupITCase.java
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/StaleResourceCleanupITCase.java
new file mode 100644
index 0000000..66cad7c
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/StaleResourceCleanupITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+/** Dummy test to clean up stale resources. */
+@Tag("requires-aws-credentials")
+public class StaleResourceCleanupITCase {
+ @Test
+ void cleanUpStaleKinesisStreams() {
+ try (SdkHttpClient httpClient =
AWSServicesTestUtils.createHttpClient();
+ KinesisClient kinesisClient =
+ AWSEndToEndTestUtils.createAwsSyncClient(
+ httpClient, KinesisClient.builder());
+ AWSKinesisResourceManager kinesisResourceManager =
+ new AWSKinesisResourceManager(kinesisClient); ) {
+ kinesisResourceManager.cleanUpStaleKinesisDataStreams();
+ }
+ }
+}
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/junit-platform.properties
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/junit-platform.properties
new file mode 100644
index 0000000..5f842f1
--- /dev/null
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/junit-platform.properties
@@ -0,0 +1,4 @@
+junit.jupiter.execution.parallel.enabled = true
+junit.jupiter.execution.parallel.config.strategy = fixed
+# We set parallelism of 4 because the maximum number of Kinesis Data Streams
in CREATING/DELETING state is 5
+junit.jupiter.execution.parallel.config.fixed.parallelism = 4
diff --git
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
index 835c2ec..c1ab1dd 100644
---
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
+++
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
@@ -26,3 +26,6 @@ appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.resourceManager.name =
org.apache.flink.connector.kinesis.testutils.AWSKinesisResourceManager
+logger.resourceManager.level = WARN
diff --git a/flink-connector-aws-e2e-tests/pom.xml
b/flink-connector-aws-e2e-tests/pom.xml
index b46b4ec..a0b126d 100644
--- a/flink-connector-aws-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -88,6 +88,38 @@ under the License.
<systemPropertyVariables>
<moduleDir>${project.basedir}</moduleDir>
</systemPropertyVariables>
+
<excludedGroups>requires-aws-credentials</excludedGroups>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>run-aws-end-to-end-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>end-to-end-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/*.*</include>
+ </includes>
+ <groups>requires-aws-credentials</groups>
+ <!-- E2E tests must not access flink-dist
concurrently. -->
+ <forkCount>1</forkCount>
+ <systemPropertyVariables>
+
<moduleDir>${project.basedir}</moduleDir>
+ </systemPropertyVariables>
</configuration>
</execution>
</executions>