This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit 79f627af36c9758ecd28419e18731477209a4965
Author: Danny Cranmer <[email protected]>
AuthorDate: Sat Dec 3 19:47:20 2022 +0000

    [FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for 
Firehose connectors
---
 .github/workflows/ci.yml                           |  20 +-
 flink-connector-aws-e2e-tests/README.md            |  14 ++
 .../pom.xml                                        | 143 ++++++++++++
 .../table/test/KinesisFirehoseTableITTest.java     | 257 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../src/test/resources/send-orders.sql             |  36 +++
 flink-connector-aws-e2e-tests/pom.xml              | 114 +++++++++
 pom.xml                                            |   2 +
 8 files changed, 613 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c2ee3af..76962f3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -29,6 +29,7 @@ jobs:
       MVN_COMMON_OPTIONS: -U -B --no-transfer-progress
       MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
       FLINK_URL: https://dlcdn.apache.org/flink/flink-${{ matrix.flink 
}}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz
+      FLINK_CACHE_DIR: "/tmp/cache/flink"
       MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out"
       MVN_VALIDATION_DIR: "/tmp/flink-validation-deployment"
     steps:
@@ -49,13 +50,30 @@ jobs:
         with:
           maven-version: 3.8.5
 
+      - name: Create cache dirs
+        run: mkdir -p ${{ env.FLINK_CACHE_DIR }}
+
+      - name: Cache Flink binary
+        if: ${{ inputs.cache_flink_binary == 'true' }}
+        uses: actions/cache@v3
+        id: cache-flink
+        with:
+          path: ${{ env.FLINK_CACHE_DIR }}
+          key: ${{ inputs.flink_url }}
+
+      - name: Download Flink binary
+        working-directory: ${{ env.FLINK_CACHE_DIR }}
+        if: steps.cache-flink.outputs.cache-hit != 'true'
+        run: wget -q -c ${{ inputs.flink_url }} -O - | tar -xz
+
       - name: Compile and test flink-connector-dynamodb
         run: |
           set -o pipefail
           
           mvn clean install -Dflink.convergence.phase=install 
-Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} \
             -DaltDeploymentRepository=validation_repository::default::file:${{ 
env.MVN_VALIDATION_DIR }} \
-            -Dflink.version=${{ matrix.flink }} | tee ${{ 
env.MVN_BUILD_OUTPUT_FILE }}
+            -Dflink.version=${{ matrix.flink }} | tee ${{ 
env.MVN_BUILD_OUTPUT_FILE }} \
+            -Prun-end-to-end-tests -DdistDir=${{ env.FLINK_CACHE_DIR 
}}/flink-${{ matrix.flink }} \
 
       - name: Check licensing
         run: |
diff --git a/flink-connector-aws-e2e-tests/README.md 
b/flink-connector-aws-e2e-tests/README.md
new file mode 100644
index 0000000..1308ca7
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/README.md
@@ -0,0 +1,14 @@
+# Apache Flink AWS Connectors end-to-end tests
+
+To run the end-to-end tests you will need a `flink-dist`. You can build Flink 
from source or download 
+from https://dist.apache.org/repos/dist/release/flink. For example, download
+[flink-1.16.0-bin-scala_2.12.tgz](https://dist.apache.org/repos/dist/release/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz)
+and extract, then find `flink-dist-1.16.0.jar` in the `lib` folder.
+
+The end-to-end tests are disabled by default, to run them you can use the 
`run-end-to-end-tests` maven profile.
+
+Example command to run end-to-end tests:
+```
+mvn clean verify -Prun-end-to-end-tests 
-DdistDir=<path-to-dist>/flink-1.16.0/lib/flink-dist-1.16.0.jar 
+```
+
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
new file mode 100644
index 0000000..8c9a1c3
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <artifactId>flink-connector-aws-e2e-tests-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>4.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-connector-aws-kinesis-firehose-e2e-tests</artifactId>
+    <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Firehose e2e 
tests</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>iam</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy</id>
+                        <phase>pre-integration-test</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <artifactItems>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            
<artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sql-kinesis-firehose.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
+                    </artifactItems>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.22.1</version>
+                <configuration>
+                    <systemPropertyVariables>
+                        <!-- Required for Kinesalite. -->
+                        <!-- Including shaded and non-shaded conf to support 
test running from Maven and IntelliJ -->
+                        
<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+                        
<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+                        
<org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>true
+                        
</org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>
+                        
<org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>true
+                        
</org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
new file mode 100644
index 0000000..76be9a4
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createHttpClient;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket;
+import static 
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
+import static 
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createFirehoseClient;
+
+/** End to End test for Kinesis Firehose Table sink API. */
+public class KinesisFirehoseTableITTest extends TestLogger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisFirehoseTableITTest.class);
+
+    private static final String ROLE_NAME = "super-role";
+    private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + 
ROLE_NAME;
+    private static final String BUCKET_NAME = "s3-firehose";
+    private static final String STREAM_NAME = "s3-stream";
+
+    private static final ObjectMapper OBJECT_MAPPER = 
JacksonMapperFactory.createObjectMapper();
+
+    private final Path sqlConnectorFirehoseJar = 
ResourceTestUtils.getResource(".*firehose.jar");
+
+    private SdkHttpClient httpClient;
+    private S3Client s3Client;
+    private FirehoseClient firehoseClient;
+    private IamClient iamClient;
+
+    private static final int NUM_ELEMENTS = 5;
+    private static final Network network = Network.newNetwork();
+
+    @ClassRule public static final Timeout TIMEOUT = new Timeout(10, 
TimeUnit.MINUTES);
+
+    @ClassRule
+    public static LocalstackContainer mockFirehoseContainer =
+            new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+                    .withNetwork(network)
+                    .withNetworkAliases("localstack");
+
+    public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+            TestcontainersSettings.builder()
+                    .environmentVariable("AWS_CBOR_DISABLE", "1")
+                    .environmentVariable(
+                            "FLINK_ENV_JAVA_OPTS",
+                            
"-Dorg.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking
 -Daws.cborEnabled=false")
+                    .network(network)
+                    .logger(LOG)
+                    .dependsOn(mockFirehoseContainer)
+                    .build();
+
+    public static final FlinkContainers FLINK =
+            
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+    @Before
+    public void setup() {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+
+        httpClient = createHttpClient();
+
+        s3Client = createS3Client(mockFirehoseContainer.getEndpoint(), 
httpClient);
+        firehoseClient = 
createFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient);
+        iamClient = createIamClient(mockFirehoseContainer.getEndpoint(), 
httpClient);
+
+        LOG.info("1 - Creating the bucket for Firehose to deliver into...");
+        createBucket(s3Client, BUCKET_NAME);
+
+        LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 
bucket...");
+        createIAMRole(iamClient, ROLE_NAME);
+
+        LOG.info("3 - Creating the Firehose delivery stream...");
+        createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, 
firehoseClient);
+
+        LOG.info("Done setting up the localstack.");
+    }
+
+    @BeforeClass
+    public static void setupFlink() throws Exception {
+        FLINK.start();
+    }
+
+    @AfterClass
+    public static void stopFlink() {
+        FLINK.stop();
+    }
+
+    @After
+    public void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+
+        s3Client.close();
+        firehoseClient.close();
+        iamClient.close();
+        httpClient.close();
+    }
+
+    @Test
+    public void testTableApiSink() throws Exception {
+        List<Order> orderList = getTestOrders();
+
+        executeSqlStatements(readSqlFile("send-orders.sql"));
+        List<Order> orders = readFromS3();
+        Assertions.assertThat(orders).containsAll(orderList);
+    }
+
+    private List<Order> getTestOrders() {
+        return IntStream.range(1, NUM_ELEMENTS)
+                .mapToObj(this::getOrderWithOffset)
+                .collect(Collectors.toList());
+    }
+
+    private Order getOrderWithOffset(int offset) {
+        return new Order(String.valueOf((char) ('A' + offset - 1)), offset);
+    }
+
+    private void executeSqlStatements(final List<String> sqlLines) throws 
Exception {
+        FLINK.submitSQLJob(
+                new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJars(sqlConnectorFirehoseJar)
+                        .build());
+    }
+
+    private List<String> readSqlFile(final String resourceName) throws 
Exception {
+        return Files.readAllLines(Paths.get(getClass().getResource("/" + 
resourceName).toURI()));
+    }
+
+    private List<Order> readFromS3() throws Exception {
+
+        Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+        List<S3Object> ordersObjects;
+        List<Order> orders;
+        do {
+            Thread.sleep(1000);
+            ordersObjects = listBucketObjects(s3Client, BUCKET_NAME);
+            orders =
+                    readObjectsFromS3Bucket(
+                            s3Client,
+                            ordersObjects,
+                            BUCKET_NAME,
+                            responseBytes ->
+                                    fromJson(
+                                            new 
String(responseBytes.asByteArrayUnsafe()),
+                                            Order.class));
+        } while (deadline.hasTimeLeft() && orders.size() < NUM_ELEMENTS);
+
+        return orders;
+    }
+
+    private <T> T fromJson(final String json, final Class<T> type) {
+        try {
+            return OBJECT_MAPPER.readValue(json, type);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(String.format("Failed to deserialize 
json: %s", json), e);
+        }
+    }
+
+    /** POJO model class for sending and receiving records on Kinesis during 
e2e test. */
+    public static class Order {
+        private final String code;
+        private final int quantity;
+
+        @JsonCreator
+        public Order(
+                @JsonProperty("code") final String code, 
@JsonProperty("quantity") int quantity) {
+            this.code = code;
+            this.quantity = quantity;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            Order order = (Order) o;
+            return quantity == order.quantity && Objects.equals(code, 
order.code);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(code, quantity);
+        }
+
+        @Override
+        public String toString() {
+            return "Order{" + "code='" + code + '\'' + ", quantity=" + 
quantity + '}';
+        }
+    }
+}
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/resources/log4j2-test.properties
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git 
a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/resources/send-orders.sql
 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/resources/send-orders.sql
new file mode 100644
index 0000000..4117079
--- /dev/null
+++ 
b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/resources/send-orders.sql
@@ -0,0 +1,36 @@
+--/*
+-- * Licensed to the Apache Software Foundation (ASF) under one
+-- * or more contributor license agreements.  See the NOTICE file
+-- * distributed with this work for additional information
+-- * regarding copyright ownership.  The ASF licenses this file
+-- * to you under the Apache License, Version 2.0 (the
+-- * "License"); you may not use this file except in compliance
+-- * with the License.  You may obtain a copy of the License at
+-- *
+-- *     http://www.apache.org/licenses/LICENSE-2.0
+-- *
+-- * Unless required by applicable law or agreed to in writing, software
+-- * distributed under the License is distributed on an "AS IS" BASIS,
+-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- * See the License for the specific language governing permissions and
+-- * limitations under the License.
+-- */
+
+CREATE TABLE orders (
+  `code` STRING,
+  `quantity` BIGINT
+) WITH (
+  'connector' = 'firehose',
+  'delivery-stream' = 's3-stream',
+  'aws.region' = 'ap-southeast-1',
+  'aws.endpoint' = 'https://localstack:4566',
+  'aws.credentials.provider' = 'BASIC',
+  'aws.credentials.basic.accesskeyid' = 'accessKeyId',
+  'aws.credentials.basic.secretkey' = 'secretAccessKey',
+  'aws.trust.all.certificates' = 'true',
+  'sink.http-client.protocol.version' = 'HTTP1_1',
+  'sink.batch.max-size' = '1',
+  'format' = 'json'
+);
+
+INSERT INTO orders VALUES ('A',1),('B',2),('C',3),('D',4),('E',5);
diff --git a/flink-connector-aws-e2e-tests/pom.xml 
b/flink-connector-aws-e2e-tests/pom.xml
new file mode 100644
index 0000000..0835d85
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-connector-aws-e2e-tests-parent</artifactId>
+    <version>4.0-SNAPSHOT</version>
+    <name>Flink : Connectors : AWS : E2E Tests : Parent</name>
+    <packaging>pom</packaging>
+
+    <properties>
+        <scala.binary.version>2.12</scala.binary.version>
+    </properties>
+
+    <modules>
+        <module>flink-connector-aws-kinesis-firehose-e2e-tests</module>
+    </modules>
+
+    <dependencyManagement>
+        <!-- For dependency convergence -->
+        <dependencies>
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-library</artifactId>
+                <version>2.12.7</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <profiles>
+        <profile>
+            <id>run-end-to-end-tests</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>end-to-end-tests</id>
+                                <phase>integration-test</phase>
+                                <goals>
+                                    <goal>test</goal>
+                                </goals>
+                                <configuration>
+                                    <includes>
+                                        <include>**/*.*</include>
+                                    </includes>
+                                    <!-- E2E tests must not access flink-dist 
concurrently. -->
+                                    <forkCount>1</forkCount>
+                                    <systemPropertyVariables>
+                                        
<moduleDir>${project.basedir}</moduleDir>
+                                    </systemPropertyVariables>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>default-test</id>
+                        <phase>none</phase>
+                    </execution>
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>none</phase>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/pom.xml b/pom.xml
index a1f0040..091ec33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,8 @@ under the License.
         <module>flink-sql-connector-aws-kinesis-firehose</module>
         <module>flink-sql-connector-aws-kinesis-streams</module>
         <module>flink-sql-connector-kinesis</module>
+
+        <module>flink-connector-aws-e2e-tests</module>
     </modules>
 
     <dependencies>

Reply via email to