This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new b681cfc [FLINK-31986][Connectors/Kinesis] Setup integration tests for
source
b681cfc is described below
commit b681cfcd372a44a34c4e6c868cb8a69d50aac67c
Author: Elphas Toringepi <[email protected]>
AuthorDate: Fri Sep 27 17:14:50 2024 +0100
[FLINK-31986][Connectors/Kinesis] Setup integration tests for source
---
.../flink/connector/aws/util/AWSGeneralUtil.java | 21 +-
.../aws/testutils/AWSServicesTestUtils.java | 6 +-
.../connector/aws/util/AWSClientUtilTest.java | 20 ++
.../flink-connector-aws-kinesis-streams/pom.xml | 6 +
.../kinesis/source/KinesisStreamsSource.java | 9 +-
.../kinesis/source/KinesisStreamsSourceITCase.java | 220 +++++++++++++++++++++
6 files changed, 272 insertions(+), 10 deletions(-)
diff --git
a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
index cea8248..5325b43 100644
---
a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
+++
b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -297,9 +297,8 @@ public class AWSGeneralUtil {
return createAsyncHttpClient(configProperties,
NettyNioAsyncHttpClient.builder());
}
- public static SdkAsyncHttpClient createAsyncHttpClient(
- final Properties configProperties,
- final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ @VisibleForTesting
+ static AttributeMap getSdkHttpConfigurationOptions(final Properties
configProperties) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
@@ -335,7 +334,15 @@ public class AWSGeneralUtil {
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL,
protocol));
- return createAsyncHttpClient(clientConfiguration.build(),
httpClientBuilder);
+
+ return clientConfiguration.build();
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(
+ final Properties configProperties,
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ return createAsyncHttpClient(
+ getSdkHttpConfigurationOptions(configProperties),
httpClientBuilder);
}
public static SdkAsyncHttpClient createAsyncHttpClient(
@@ -355,6 +362,12 @@ public class AWSGeneralUtil {
return
httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
}
+ public static SdkHttpClient createSyncHttpClient(
+ final Properties configProperties, final ApacheHttpClient.Builder
httpClientBuilder) {
+ return createSyncHttpClient(
+ getSdkHttpConfigurationOptions(configProperties),
httpClientBuilder);
+ }
+
public static SdkHttpClient createSyncHttpClient(
final AttributeMap config, final ApacheHttpClient.Builder
httpClientBuilder) {
httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT);
diff --git
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 207f567..5bea73b 100644
---
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -115,7 +115,11 @@ public class AWSServicesTestUtils {
}
public static void createIAMRole(IamClient iam, String roleName) {
- CreateRoleRequest request =
CreateRoleRequest.builder().roleName(roleName).build();
+ CreateRoleRequest request =
+ CreateRoleRequest.builder()
+ .roleName(roleName)
+ .assumeRolePolicyDocument("{}")
+ .build();
iam.createRole(request);
}
diff --git
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
index 0c2c2a9..67331e3 100644
---
a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
+++
b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.connector.aws.util;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -28,7 +30,9 @@ import
software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
@@ -50,6 +54,7 @@ import java.util.Properties;
import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static
org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix;
+import static
org.apache.flink.connector.aws.util.AWSGeneralUtil.getSdkHttpConfigurationOptions;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
@@ -204,6 +209,21 @@ class AWSClientUtilTest {
.isTrue();
}
+ @Test
+ void testGetSdkHttpConfigurationOptions() {
+ Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+ properties.setProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES,
"true");
+ properties.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION,
"HTTP1_1");
+ AttributeMap options = getSdkHttpConfigurationOptions(properties);
+
+
assertThat(options.get(SdkHttpConfigurationOption.TCP_KEEPALIVE).booleanValue()).isTrue();
+
assertThat(options.containsKey(SdkHttpConfigurationOption.MAX_CONNECTIONS)).isFalse();
+
assertThat(options.containsKey(SdkHttpConfigurationOption.READ_TIMEOUT)).isFalse();
+
assertThat(options.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
+ .isTrue();
+
assertThat(options.get(SdkHttpConfigurationOption.PROTOCOL)).isEqualTo(Protocol.HTTP1_1);
+ }
+
@Test
void testCreateKinesisAsyncClientWithEndpointOverride() {
Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
index 639dbbb..dc3f6a1 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
@@ -122,6 +122,12 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index 1ba3704..7838357 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -57,7 +57,6 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.utils.AttributeMap;
import java.time.Duration;
import java.util.Map;
@@ -193,10 +192,6 @@ public class KinesisStreamsSource<T>
}
private KinesisStreamProxy createKinesisStreamProxy(Configuration
consumerConfig) {
- SdkHttpClient httpClient =
- AWSGeneralUtil.createSyncHttpClient(
- AttributeMap.builder().build(),
ApacheHttpClient.builder());
-
String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
@@ -219,6 +214,10 @@ public class KinesisStreamsSource<T>
AWSConfigOptions
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));
+ SdkHttpClient httpClient =
+ AWSGeneralUtil.createSyncHttpClient(
+ kinesisClientProperties, ApacheHttpClient.builder());
+
AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
KinesisClient kinesisClient =
AWSClientUtil.createAwsSyncClient(
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
new file mode 100644
index 0000000..4a2adec
--- /dev/null
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
@@ -0,0 +1,220 @@
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** IT cases for using {@code KinesisStreamsSource} using a localstack
container. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class KinesisStreamsSourceITCase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisStreamsSourceITCase.class);
+ private static final String LOCALSTACK_DOCKER_IMAGE_VERSION =
"localstack/localstack:3.7.2";
+
+ @Container
+ private static final LocalstackContainer MOCK_KINESIS_CONTAINER =
+ new
LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION));
+
+ private StreamExecutionEnvironment env;
+ private SdkHttpClient httpClient;
+ private KinesisClient kinesisClient;
+
+ @BeforeEach
+ void setUp() {
+ System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ httpClient = AWSServicesTestUtils.createHttpClient();
+ kinesisClient =
+ AWSServicesTestUtils.createAwsSyncClient(
+ MOCK_KINESIS_CONTAINER.getEndpoint(), httpClient,
KinesisClient.builder());
+ }
+
+ @AfterEach
+ void teardown() {
+ System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+ AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+ }
+
+ @Test
+ void nonExistentStreamShouldResultInFailure() {
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+ .localstackStreamName("stream-exists")
+ .withSourceConnectionStreamArn(
+
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists")
+ .runScenario())
+ .withStackTraceContaining(
+ "Stream arn
arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists not
found");
+ }
+
+ @Test
+ void validStreamIsConsumed() throws Exception {
+ new Scenario()
+ .localstackStreamName("valid-stream")
+ .withSourceConnectionStreamArn(
+
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/valid-stream")
+ .runScenario();
+ }
+
+ private Configuration getDefaultConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.setString(AWS_ENDPOINT,
MOCK_KINESIS_CONTAINER.getEndpoint());
+ configuration.setString(AWS_ACCESS_KEY_ID, "accessKeyId");
+ configuration.setString(AWS_SECRET_ACCESS_KEY, "secretAccessKey");
+ configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+ configuration.setString(TRUST_ALL_CERTIFICATES, "true");
+ configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ return configuration;
+ }
+
+ private class Scenario {
+ private final int expectedElements = 1000;
+ private String localstackStreamName = null;
+ private String sourceConnectionStreamArn;
+ private final Configuration configuration =
+ KinesisStreamsSourceITCase.this.getDefaultConfiguration();
+
+ public void runScenario() throws Exception {
+ if (localstackStreamName != null) {
+ prepareStream(localstackStreamName);
+ }
+
+ putRecords(localstackStreamName, expectedElements);
+
+ KinesisStreamsSource<String> kdsSource =
+ KinesisStreamsSource.<String>builder()
+ .setStreamArn(sourceConnectionStreamArn)
+ .setSourceConfig(configuration)
+ .setDeserializationSchema(new SimpleStringSchema())
+ .build();
+
+ List<String> result =
+ env.fromSource(kdsSource,
WatermarkStrategy.noWatermarks(), "Kinesis source")
+ .returns(TypeInformation.of(String.class))
+ .executeAndCollect(expectedElements);
+
+ Assertions.assertThat(result.size()).isEqualTo(expectedElements);
+ }
+
+ public Scenario withSourceConnectionStreamArn(String
sourceConnectionStreamArn) {
+ this.sourceConnectionStreamArn = sourceConnectionStreamArn;
+ return this;
+ }
+
+ public Scenario localstackStreamName(String localstackStreamName) {
+ this.localstackStreamName = localstackStreamName;
+ return this;
+ }
+
+ private void prepareStream(String streamName) throws Exception {
+ final RateLimiter rateLimiter =
+ RateLimiterBuilder.newBuilder()
+ .withRate(1, SECONDS)
+ .withConstantThroughput()
+ .build();
+
+ kinesisClient.createStream(
+
CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
+
+ Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+ while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
+ if (deadline.isOverdue()) {
+ throw new RuntimeException("Failed to create stream within
time");
+ }
+ }
+ }
+
+ private void putRecords(String streamName, int numRecords) {
+ List<byte[]> messages =
+ IntStream.range(0, numRecords)
+ .mapToObj(String::valueOf)
+ .map(String::getBytes)
+ .collect(Collectors.toList());
+
+ for (List<byte[]> partition : Lists.partition(messages, 500)) {
+ List<PutRecordsRequestEntry> entries =
+ partition.stream()
+ .map(
+ msg ->
+
PutRecordsRequestEntry.builder()
+
.partitionKey("fakePartitionKey")
+
.data(SdkBytes.fromByteArray(msg))
+ .build())
+ .collect(Collectors.toList());
+ PutRecordsRequest requests =
+
PutRecordsRequest.builder().streamName(streamName).records(entries).build();
+ PutRecordsResponse putRecordResult =
kinesisClient.putRecords(requests);
+ for (PutRecordsResultEntry result : putRecordResult.records())
{
+ LOG.debug("Added record: {}", result.sequenceNumber());
+ }
+ }
+ }
+
+ private boolean streamExists(final String streamName) {
+ try {
+ return kinesisClient
+ .describeStream(
+ DescribeStreamRequest.builder()
+ .streamName(streamName)
+ .build())
+ .streamDescription()
+ .streamStatus()
+ == StreamStatus.ACTIVE;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+}