This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 9743a29  [FLINK-33073][Connectors/AWS] Implement end-to-end tests for 
KinesisStreamsSink
9743a29 is described below

commit 9743a29ea53836ba5486548533adacc3a9b4db50
Author: Hong Liang Teoh <[email protected]>
AuthorDate: Tue Sep 12 00:26:56 2023 +0100

    [FLINK-33073][Connectors/AWS] Implement end-to-end tests for 
KinesisStreamsSink
---
 .github/workflows/common.yml                       |  22 ++
 .github/workflows/nightly.yml                      |   1 +
 flink-connector-aws-e2e-tests/README.md            |   9 +
 .../pom.xml                                        |  24 +-
 .../kinesis/sink/KinesisStreamsSinkITCase.java     | 359 +++++++++++++++++++++
 .../kinesis/testutils/AWSEndToEndTestUtils.java    |  78 +++++
 .../testutils/AWSKinesisResourceManager.java       | 120 +++++++
 .../testutils/StaleResourceCleanupITCase.java      |  42 +++
 .../src/test/resources/junit-platform.properties   |   4 +
 .../src/test/resources/log4j2-test.properties      |   3 +
 flink-connector-aws-e2e-tests/pom.xml              |  32 ++
 11 files changed, 677 insertions(+), 17 deletions(-)

diff --git a/.github/workflows/common.yml b/.github/workflows/common.yml
index 237a779..e087cf4 100644
--- a/.github/workflows/common.yml
+++ b/.github/workflows/common.yml
@@ -41,6 +41,11 @@ on:
         required: false
         type: number
         default: 50
+      run_aws_end_to_end_test:
+        description: "Whether to run the AWS end to end tests, requiring AWS 
credentials."
+        required: false
+        type: boolean
+        default: false
 
 jobs:
   compile_and_test:
@@ -55,6 +60,8 @@ jobs:
       FLINK_CACHE_DIR: "/tmp/cache/flink"
       MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out"
       MVN_VALIDATION_DIR: "/tmp/flink-validation-deployment"
+      FLINK_AWS_USER: ${{ vars.FLINK_AWS_USER }}
+      FLINK_AWS_PASSWORD: ${{ vars.FLINK_AWS_PASSWORD }}
     steps:
       - run: echo "Running CI pipeline for JDK version ${{ matrix.jdk }}"
 
@@ -112,6 +119,21 @@ jobs:
           
           mvn clean
 
+      - name: Run AWS e2e tests
+        if: ${{ inputs.run_aws_end_to_end_test }}
+        run: |
+          set -o pipefail
+
+          cd flink-connector-aws-e2e-tests
+
+          mvn clean verify ${{ env.MVN_CONNECTION_OPTIONS }} \
+            -DaltDeploymentRepository=validation_repository::default::file:${{ 
env.MVN_VALIDATION_DIR }} \
+            -Dflink.version=${{ inputs.flink_version }}  \
+            -Prun-aws-end-to-end-tests -DdistDir=${{ env.FLINK_CACHE_DIR 
}}/flink-${{ inputs.flink_version }} \
+            | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
+
+          mvn clean
+
       - name: Check licensing
         run: |
           mvn ${MVN_COMMON_OPTIONS} exec:java@check-license -N \
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 7084a92..2f4e365 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -31,3 +31,4 @@ jobs:
       flink_version: ${{ matrix.flink }}
       flink_url: https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink 
}}-bin-scala_2.12.tgz
       cache_flink_binary: false
+      run_aws_end_to_end_test: true
diff --git a/flink-connector-aws-e2e-tests/README.md 
b/flink-connector-aws-e2e-tests/README.md
index 7d4696c..02cad0f 100644
--- a/flink-connector-aws-e2e-tests/README.md
+++ b/flink-connector-aws-e2e-tests/README.md
@@ -11,3 +11,12 @@ Example command to run end-to-end tests:
 mvn clean verify -Prun-end-to-end-tests 
-DdistDir=<path-to-dist>/flink-{FLINK_VERSION}/lib/flink-dist-{FLINK_VERSION}.jar
 
 ```
 
+### Running end-to-end tests which require AWS credentials
+
+This test suite includes some integration tests that require access to AWS 
credentials. These tests run directly against AWS services.
+
+These tests can be run by configuring the required AWS credentials, and using 
the `run-aws-end-to-end-tests` maven profile.
+Example command:
+```
+FLINK_AWS_USER=<AWS_ACCESS_KEY_ID> FLINK_AWS_PASSWORD=<AWS_SECRET_ACCESS_KEY> 
mvn clean verify -Prun-aws-end-to-end-tests 
-DdistDir=<path-to-dist>/flink-{FLINK_VERSION}/lib/flink-dist-{FLINK_VERSION}.jar
 
+```
\ No newline at end of file
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
index 3ce3764..5137e45 100644
--- 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
@@ -41,6 +41,13 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-kinesis-streams</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-aws-base</artifactId>
@@ -121,23 +128,6 @@
                     </artifactItems>
                 </configuration>
             </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <systemPropertyVariables>
-                        <!-- Required for Kinesalite. -->
-                        <!-- Including shaded and non-shaded conf to support 
test running from Maven and IntelliJ -->
-                        
<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
-                        
<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
-                        
<org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>true
-                        
</org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>
-                        
<org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>true
-                        
</org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>
-                    </systemPropertyVariables>
-                </configuration>
-            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
new file mode 100644
index 0000000..9287d2e
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.kinesis.testutils.AWSEndToEndTestUtils;
+import org.apache.flink.connector.kinesis.testutils.AWSKinesisResourceManager;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+
+/** End-to-end tests for Kinesis Data Streams Sink. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Tag("requires-aws-credentials")
+@Disabled
+class KinesisStreamsSinkITCase {
+
+    private static final String DEFAULT_FIRST_SHARD_NAME = 
"shardId-000000000000";
+
+    private final SerializationSchema<String> serializationSchema = new 
SimpleStringSchema();
+    private final PartitionKeyGenerator<String> partitionKeyGenerator =
+            element -> String.valueOf(element.hashCode());
+    private final PartitionKeyGenerator<String> longPartitionKeyGenerator = 
element -> element;
+
+    private StreamExecutionEnvironment env;
+    private SdkHttpClient httpClient;
+    private KinesisClient kinesisClient;
+    private AWSKinesisResourceManager kinesisResourceManager;
+
+    @BeforeEach
+    void setUp() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        httpClient = AWSServicesTestUtils.createHttpClient();
+        kinesisClient =
+                AWSEndToEndTestUtils.createAwsSyncClient(httpClient, 
KinesisClient.builder());
+        kinesisResourceManager = new AWSKinesisResourceManager(kinesisClient);
+    }
+
+    @AfterEach
+    void teardown() {
+        kinesisResourceManager.close();
+        AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+    }
+
+    @Test
+    void 
elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached() throws 
Exception {
+        new Scenario().runScenario();
+    }
+
+    @Test
+    void 
elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive()
+            throws Exception {
+        new Scenario()
+                .withNumberOfElementsToSend(10)
+                .withMaxBatchSize(100)
+                .withExpectedElements(10)
+                .runScenario();
+    }
+
+    @Test
+    void veryLargeMessagesSucceedInBeingPersisted() throws Exception {
+        new Scenario()
+                .withNumberOfElementsToSend(5)
+                .withSizeOfMessageBytes(2500)
+                .withMaxBatchSize(10)
+                .withExpectedElements(5)
+                .runScenario();
+    }
+
+    @Test
+    void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted() 
throws Exception {
+        new Scenario()
+                .withNumberOfElementsToSend(150)
+                .withSizeOfMessageBytes(2500)
+                .withBufferMaxTimeMS(2000)
+                .withMaxInflightReqs(10)
+                .withMaxBatchSize(20)
+                .withExpectedElements(150)
+                .runScenario();
+    }
+
+    @Test
+    void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() {
+        testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(true);
+    }
+
+    @Test
+    void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() {
+        testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(false);
+    }
+
+    private void 
testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(
+            boolean failOnError) {
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(
+                        () ->
+                                new Scenario()
+                                        .withSinkConnectionStreamName(
+                                                "flink-test-stream-not-exists")
+                                        .withFailOnError(failOnError)
+                                        .runScenario())
+                .havingCause()
+                .havingCause()
+                .havingCause()
+                .withMessageContaining("Stream flink-test-stream-not-exists 
under account");
+    }
+
+    @Test
+    void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(
+                        () ->
+                                new Scenario()
+                                        .withNumberOfElementsToSend(5)
+                                        .withSizeOfMessageBytes(2500)
+                                        .withExpectedElements(5)
+                                        
.withSerializationSchema(serializationSchema)
+                                        
.withPartitionKeyGenerator(longPartitionKeyGenerator)
+                                        .runScenario())
+                .havingCause()
+                .havingCause()
+                .withMessageContaining(
+                        "Encountered an exception while persisting records, 
not retrying due to {failOnError} being set.");
+    }
+
+    @Test
+    void badEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
+        badEndpointShouldResultInFailureWhenInFailOnErrorIs(true);
+    }
+
+    @Test
+    void badEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
+        badEndpointShouldResultInFailureWhenInFailOnErrorIs(false);
+    }
+
+    private void badEndpointShouldResultInFailureWhenInFailOnErrorIs(boolean 
failOnError) {
+        Properties properties = getDefaultProperties();
+        properties.setProperty(AWS_ENDPOINT, "https://bad-endpoint-with-uri";);
+        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
+                failOnError,
+                properties,
+                "UnknownHostException when attempting to interact with a 
service.");
+    }
+
+    @Test
+    void accessDeniedShouldFailJobWhenFailOnErrorIsOn() {
+        accessDeniedShouldFailJobWhenFailOnErrorIs(true);
+    }
+
+    @Test
+    void accessDeniedShouldFailJobWhenFailOnErrorIsOff() {
+        accessDeniedShouldFailJobWhenFailOnErrorIs(false);
+    }
+
+    private void accessDeniedShouldFailJobWhenFailOnErrorIs(boolean 
failOnError) {
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(
+                        () ->
+                                new Scenario()
+                                        .withSinkConnectionStreamName(
+                                                
"no-permission-for-this-stream")
+                                        .withFailOnError(failOnError)
+                                        .runScenario())
+                .havingCause()
+                .havingCause()
+                .withMessageContaining(
+                        "Encountered an exception while persisting records, 
not retrying due to {failOnError} being set.")
+                .havingCause()
+                .withMessageContaining(
+                        "is not authorized to perform: kinesis:PutRecords on 
resource");
+    }
+
+    private void assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
+            boolean failOnError, Properties properties, String 
expectedMessage) {
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(
+                        () ->
+                                new Scenario()
+                                        .withFailOnError(failOnError)
+                                        .withProperties(properties)
+                                        .runScenario())
+                .withStackTraceContaining(expectedMessage);
+    }
+
+    private Properties getDefaultProperties() {
+        return AWSEndToEndTestUtils.createTestConfig();
+    }
+
+    private class Scenario {
+        private int numberOfElementsToSend = 50;
+        private int sizeOfMessageBytes = 25;
+        private int bufferMaxTimeMS = 1000;
+        private int maxInflightReqs = 1;
+        private int maxBatchSize = 50;
+        private int expectedElements = 50;
+        private boolean failOnError = false;
+        private String kinesisDataStreamARN = null;
+        private String sinkConnectionStreamName = null;
+        private String sinkConnectionStreamArn = null;
+        private SerializationSchema<String> serializationSchema =
+                KinesisStreamsSinkITCase.this.serializationSchema;
+        private PartitionKeyGenerator<String> partitionKeyGenerator =
+                KinesisStreamsSinkITCase.this.partitionKeyGenerator;
+        private Properties properties = 
KinesisStreamsSinkITCase.this.getDefaultProperties();
+
+        public void runScenario() throws Exception {
+            kinesisDataStreamARN = 
kinesisResourceManager.createKinesisDataStream();
+            if (sinkConnectionStreamName == null) {
+                sinkConnectionStreamArn = kinesisDataStreamARN;
+            }
+
+            DataStream<String> stream =
+                    env.addSource(
+                                    new DataGeneratorSource<>(
+                                            
RandomGenerator.stringGenerator(sizeOfMessageBytes),
+                                            100,
+                                            (long) numberOfElementsToSend))
+                            .returns(String.class);
+
+            KinesisStreamsSink<String> kdsSink =
+                    KinesisStreamsSink.<String>builder()
+                            .setSerializationSchema(serializationSchema)
+                            .setPartitionKeyGenerator(partitionKeyGenerator)
+                            .setMaxTimeInBufferMS(bufferMaxTimeMS)
+                            .setMaxInFlightRequests(maxInflightReqs)
+                            .setMaxBatchSize(maxBatchSize)
+                            .setFailOnError(failOnError)
+                            .setMaxBufferedRequests(1000)
+                            .setStreamName(sinkConnectionStreamName)
+                            .setStreamArn(sinkConnectionStreamArn)
+                            .setKinesisClientProperties(properties)
+                            .setFailOnError(true)
+                            .build();
+
+            stream.sinkTo(kdsSink);
+
+            env.execute("KDS Async Sink Example Program");
+
+            String shardIterator =
+                    kinesisClient
+                            .getShardIterator(
+                                    GetShardIteratorRequest.builder()
+                                            .shardId(DEFAULT_FIRST_SHARD_NAME)
+                                            
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
+                                            .streamARN(kinesisDataStreamARN)
+                                            .build())
+                            .shardIterator();
+
+            Assertions.assertThat(
+                            kinesisClient
+                                    .getRecords(
+                                            GetRecordsRequest.builder()
+                                                    
.shardIterator(shardIterator)
+                                                    .build())
+                                    .records()
+                                    .size())
+                    .isEqualTo(expectedElements);
+        }
+
+        public Scenario withNumberOfElementsToSend(int numberOfElementsToSend) 
{
+            this.numberOfElementsToSend = numberOfElementsToSend;
+            return this;
+        }
+
+        public Scenario withSizeOfMessageBytes(int sizeOfMessageBytes) {
+            this.sizeOfMessageBytes = sizeOfMessageBytes;
+            return this;
+        }
+
+        public Scenario withBufferMaxTimeMS(int bufferMaxTimeMS) {
+            this.bufferMaxTimeMS = bufferMaxTimeMS;
+            return this;
+        }
+
+        public Scenario withMaxInflightReqs(int maxInflightReqs) {
+            this.maxInflightReqs = maxInflightReqs;
+            return this;
+        }
+
+        public Scenario withMaxBatchSize(int maxBatchSize) {
+            this.maxBatchSize = maxBatchSize;
+            return this;
+        }
+
+        public Scenario withExpectedElements(int expectedElements) {
+            this.expectedElements = expectedElements;
+            return this;
+        }
+
+        public Scenario withFailOnError(boolean failOnError) {
+            this.failOnError = failOnError;
+            return this;
+        }
+
+        public Scenario withSinkConnectionStreamName(String 
sinkConnectionStreamName) {
+            this.sinkConnectionStreamName = sinkConnectionStreamName;
+            return this;
+        }
+
+        public Scenario withSerializationSchema(SerializationSchema<String> 
serializationSchema) {
+            this.serializationSchema = serializationSchema;
+            return this;
+        }
+
+        public Scenario withPartitionKeyGenerator(
+                PartitionKeyGenerator<String> partitionKeyGenerator) {
+            this.partitionKeyGenerator = partitionKeyGenerator;
+            return this;
+        }
+
+        public Scenario withProperties(Properties properties) {
+            this.properties = properties;
+            return this;
+        }
+    }
+}
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSEndToEndTestUtils.java
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSEndToEndTestUtils.java
new file mode 100644
index 0000000..a8d2824
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSEndToEndTestUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+
+/** Utility class for configuring end-to-end tests. */
+public class AWSEndToEndTestUtils {
+    private static final Region TEST_REGION = Region.AP_SOUTHEAST_1;
+
+    private AWSEndToEndTestUtils() {
+        // Private constructor to prevent initialization.
+    }
+
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsSyncClientBuilder<? extends T, S> & 
AwsClientBuilder<? extends T, S>>
+            S createAwsSyncClient(SdkHttpClient httpClient, T clientBuilder) {
+        Properties config = createTestConfig();
+        return clientBuilder
+                .httpClient(httpClient)
+                
.credentialsProvider(AWSGeneralUtil.getCredentialsProvider(config))
+                .region(AWSGeneralUtil.getRegion(config))
+                .build();
+    }
+
+    public static Properties createTestConfig() {
+        Properties config = new Properties();
+        config.setProperty(AWS_REGION, TEST_REGION.toString());
+        configureTestCredentials(config);
+        return config;
+    }
+
+    private static void configureTestCredentials(Properties config) {
+        Preconditions.checkNotNull(
+                System.getenv("FLINK_AWS_USER"),
+                "FLINK_AWS_USER not configured for end to end test.");
+        Preconditions.checkNotNull(
+                System.getenv("FLINK_AWS_PASSWORD"),
+                "FLINK_AWS_PASSWORD not configured for end to end test.");
+        config.setProperty(
+                AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER),
+                System.getenv("FLINK_AWS_USER"));
+        config.setProperty(
+                AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER),
+                System.getenv("FLINK_AWS_PASSWORD"));
+    }
+}
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSKinesisResourceManager.java
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSKinesisResourceManager.java
new file mode 100644
index 0000000..5e3e621
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/AWSKinesisResourceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.testutils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.StreamMode;
+import software.amazon.awssdk.services.kinesis.model.StreamModeDetails;
+import software.amazon.awssdk.services.kinesis.model.StreamSummary;
+import software.amazon.awssdk.services.kinesis.paginators.ListStreamsIterable;
+import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+/** Resource manager to create and clean up Kinesis Data Stream resources used 
for each test. */
+public class AWSKinesisResourceManager implements AutoCloseable {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AWSKinesisResourceManager.class);
+
+    private static final String KINESIS_DATA_STREAMS_NAME_PREFIX = 
"flink-test-";
+    private static final int DEFAULT_SHARD_COUNT = 1;
+    private final KinesisClient kinesisClient;
+    private final KinesisWaiter kinesisWaiter;
+    private final Set<String> kinesisDataStreamARNs = new HashSet<>();
+
+    public AWSKinesisResourceManager(KinesisClient kinesisClient) {
+        this.kinesisClient = kinesisClient;
+        this.kinesisWaiter = 
KinesisWaiter.builder().client(kinesisClient).build();
+    }
+
+    public String createKinesisDataStream() {
+        String streamName = KINESIS_DATA_STREAMS_NAME_PREFIX + 
UUID.randomUUID();
+        LOGGER.info("Creating Kinesis Data Stream with name {}.", streamName);
+        kinesisClient.createStream(
+                CreateStreamRequest.builder()
+                        .streamName(streamName)
+                        .shardCount(DEFAULT_SHARD_COUNT)
+                        .streamModeDetails(
+                                StreamModeDetails.builder()
+                                        .streamMode(StreamMode.PROVISIONED)
+                                        .build())
+                        .build());
+        DescribeStreamResponse response =
+                kinesisClient.describeStream(
+                        
DescribeStreamRequest.builder().streamName(streamName).build());
+        String streamARN = response.streamDescription().streamARN();
+        kinesisDataStreamARNs.add(streamARN);
+        LOGGER.info("Successfully created Kinesis Data Stream with ARN {}.", 
streamARN);
+
+        LOGGER.info("Waiting until Kinesis Data Stream with ARN {} is 
ACTIVE.", streamARN);
+        kinesisWaiter.waitUntilStreamExists(
+                DescribeStreamRequest.builder().streamARN(streamARN).build());
+        return streamARN;
+    }
+
+    public void close() {
+        kinesisDataStreamARNs.forEach(this::deleteStream);
+    }
+
+    public void cleanUpStaleKinesisDataStreams() {
+        // Delete all test streams created at least 2 days ago.
+        Instant twoDaysAgo = Instant.now().minus(2, ChronoUnit.DAYS);
+
+        LOGGER.info(
+                "Listing all Kinesis Data Stream with prefix {} created more 
than 2 days ago.",
+                KINESIS_DATA_STREAMS_NAME_PREFIX);
+        ListStreamsIterable listStreamsResponses = 
kinesisClient.listStreamsPaginator();
+        listStreamsResponses.forEach(
+                listStreamsResponse ->
+                        listStreamsResponse.streamSummaries().stream()
+                                .filter(
+                                        streamSummary ->
+                                                streamSummary
+                                                        .streamName()
+                                                        .startsWith(
+                                                                
KINESIS_DATA_STREAMS_NAME_PREFIX))
+                                .filter(
+                                        streamSummary ->
+                                                streamSummary
+                                                        
.streamCreationTimestamp()
+                                                        .isBefore(twoDaysAgo))
+                                .map(StreamSummary::streamARN)
+                                .forEach(
+                                        streamARN -> {
+                                            LOGGER.warn(
+                                                    "Found stale Kinesis Data 
Stream with ARN {}. Deleting stream",
+                                                    streamARN);
+                                            deleteStream(streamARN);
+                                        }));
+    }
+
+    private void deleteStream(String streamARN) {
+        LOGGER.info("Deleting Kinesis Data Stream with ARN {}.", streamARN);
+        
kinesisClient.deleteStream(DeleteStreamRequest.builder().streamARN(streamARN).build());
+    }
+}
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/StaleResourceCleanupITCase.java
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/StaleResourceCleanupITCase.java
new file mode 100644
index 0000000..66cad7c
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/testutils/StaleResourceCleanupITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+/** Dummy test to clean up stale resources. */
+@Tag("requires-aws-credentials")
+public class StaleResourceCleanupITCase {
+    @Test
+    void cleanUpStaleKinesisStreams() {
+        try (SdkHttpClient httpClient = 
AWSServicesTestUtils.createHttpClient();
+                KinesisClient kinesisClient =
+                        AWSEndToEndTestUtils.createAwsSyncClient(
+                                httpClient, KinesisClient.builder());
+                AWSKinesisResourceManager kinesisResourceManager =
+                        new AWSKinesisResourceManager(kinesisClient); ) {
+            kinesisResourceManager.cleanUpStaleKinesisDataStreams();
+        }
+    }
+}
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/junit-platform.properties
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/junit-platform.properties
new file mode 100644
index 0000000..5f842f1
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/junit-platform.properties
@@ -0,0 +1,4 @@
+junit.jupiter.execution.parallel.enabled = true
+junit.jupiter.execution.parallel.config.strategy = fixed
+# We set parallelism of 4 because the maximum number of Kinesis Data Streams 
in CREATING/DELETING state is 5
+junit.jupiter.execution.parallel.config.fixed.parallelism = 4
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
index 835c2ec..c1ab1dd 100644
--- 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
@@ -26,3 +26,6 @@ appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
 appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.resourceManager.name = 
org.apache.flink.connector.kinesis.testutils.AWSKinesisResourceManager
+logger.resourceManager.level = WARN
diff --git a/flink-connector-aws-e2e-tests/pom.xml 
b/flink-connector-aws-e2e-tests/pom.xml
index b46b4ec..a0b126d 100644
--- a/flink-connector-aws-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -88,6 +88,38 @@ under the License.
                                     <systemPropertyVariables>
                                         
<moduleDir>${project.basedir}</moduleDir>
                                     </systemPropertyVariables>
+                                    
<excludedGroups>requires-aws-credentials</excludedGroups>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>run-aws-end-to-end-tests</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>end-to-end-tests</id>
+                                <phase>integration-test</phase>
+                                <goals>
+                                    <goal>test</goal>
+                                </goals>
+                                <configuration>
+                                    <includes>
+                                        <include>**/*.*</include>
+                                    </includes>
+                                    <groups>requires-aws-credentials</groups>
+                                    <!-- E2E tests must not access flink-dist 
concurrently. -->
+                                    <forkCount>1</forkCount>
+                                    <systemPropertyVariables>
+                                        
<moduleDir>${project.basedir}</moduleDir>
+                                    </systemPropertyVariables>
                                 </configuration>
                             </execution>
                         </executions>


Reply via email to