This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e908be6e5344f6220a1329580d9e4535cd6ceb6 Author: Zichen Liu <[email protected]> AuthorDate: Thu Dec 23 11:57:09 2021 +0000 [FLINK-24228][connectors/firehose] Amazon Kinesis Data Firehose sink based on the Async Sink Base implemented: - Refactored AWSUnifiedSinksUtil into a class that caters for Kinesis and Firehose - Extracted commonalities between KDS & KDF sinks into flink-connector-aws-base - Implemented integration test based on Localstack container - Changing host/container ports to be different, changing HTTP1.1 to being the default, localstack issue fixed - Added docs page, changed type in Firehose, turned logging off, removed unused dependencies. --- flink-connectors/flink-connector-aws-base/pom.xml | 21 +++ .../connector/aws/util/AWSAsyncSinkUtil.java} | 107 ++++++++------- .../aws/testutils/AWSServicesTestUtils.java | 129 ++++++++++++++++++ .../aws/testutils/LocalstackContainer.java | 78 +++++++++++ .../connector/aws/util/AWSAsyncSinkUtilTest.java} | 150 ++++++++++++--------- .../pom.xml | 12 -- .../KinesisDataStreamsConfigConstants.java} | 9 +- .../kinesis/sink/KinesisDataStreamsSinkWriter.java | 10 +- .../src/test/resources/log4j2-test.properties | 2 +- .../pom.xml | 43 +++--- .../sink/KinesisFirehoseConfigConstants.java} | 17 ++- .../firehose/sink/KinesisFirehoseException.java | 54 ++++++++ .../firehose/sink/KinesisFirehoseSink.java | 115 ++++++++++++++++ .../firehose/sink/KinesisFirehoseSinkBuilder.java | 149 ++++++++++++++++++++ .../sink/KinesisFirehoseSinkElementConverter.java | 76 +++++++++++ .../firehose/sink/KinesisFirehoseSinkWriter.java} | 120 +++++++++-------- .../src/main/resources/log4j2.properties | 25 ++++ .../sink/KinesisFirehoseSinkBuilderTest.java | 70 ++++++++++ .../KinesisFirehoseSinkElementConverterTest.java | 54 ++++++++ .../firehose/sink/KinesisFirehoseSinkITCase.java | 124 +++++++++++++++++ .../firehose/sink/KinesisFirehoseSinkTest.java | 76 +++++++++++ .../sink/KinesisFirehoseSinkWriterTest.java | 107 +++++++++++++++ .../firehose/sink/examples/SinkIntoFirehose.java | 74 ++++++++++ .../sink/testutils/KinesisFirehoseTestUtils.java | 73 ++++++++++ .../src/test/resources/log4j2-test.properties | 2 +- flink-connectors/flink-connector-base/pom.xml | 16 +++ .../base/sink/writer/AsyncSinkWriterTest.java | 120 +---------------- .../base/sink/writer/TestSinkInitContext.java | 139 +++++++++++++++++++ .../connectors/kinesis/proxy/KinesisProxyV2.java | 4 +- .../kinesis/proxy/KinesisProxyV2Factory.java | 18 ++- .../streaming/connectors/kinesis/util/AWSUtil.java | 11 +- .../connectors/kinesis/util/AwsV2Util.java | 8 ++ .../connectors/kinesis/util/AwsV2UtilTest.java | 34 +++++ flink-connectors/pom.xml | 1 + .../org/apache/flink/util/DockerImageVersions.java | 2 + tools/ci/stage.sh | 1 + 36 files changed, 1697 insertions(+), 354 deletions(-) diff --git a/flink-connectors/flink-connector-aws-base/pom.xml b/flink-connectors/flink-connector-aws-base/pom.xml index b774b2e..870e487 100644 --- a/flink-connectors/flink-connector-aws-base/pom.xml +++ b/flink-connectors/flink-connector-aws-base/pom.xml @@ -63,6 +63,27 @@ under the License. <artifactId>sts</artifactId> <version>${aws.sdk.version}</version> </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <version>${aws.sdk.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>iam</artifactId> + <version>${aws.sdk.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java similarity index 61% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java rename to flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java index 1ec75aa..1256142 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java +++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java @@ -15,24 +15,21 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.util; +package org.apache.flink.connector.aws.util; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.aws.config.AWSConfigConstants; -import org.apache.flink.connector.aws.util.AWSGeneralUtil; -import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants; import org.apache.flink.runtime.util.EnvironmentInformation; +import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.core.SdkClient; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; -import software.amazon.awssdk.services.kinesis.model.LimitExceededException; -import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import java.net.URI; import java.util.Optional; @@ -40,11 +37,10 @@ import java.util.Properties; /** Some utilities specific to Amazon Web Service. */ @Internal -public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil { +public class AWSAsyncSinkUtil extends AWSGeneralUtil { - /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ - private static final String USER_AGENT_FORMAT = - AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " V2"; + /** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. */ + static final String V2_USER_AGENT_SUFFIX = " V2"; /** * Creates a user agent prefix for Flink. This can be used by HTTP Clients. @@ -62,51 +58,61 @@ public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil { /** * @param configProps configuration properties - * @param httpClient the underlying HTTP client used to talk to Kinesis - * @return a new Amazon Kinesis Client + * @param httpClient the underlying HTTP client used to talk to AWS + * @return a new AWS Client */ - public static KinesisAsyncClient createKinesisAsyncClient( - final Properties configProps, final SdkAsyncHttpClient httpClient) { + public static < + S extends SdkClient, + T extends + AwsAsyncClientBuilder<? extends T, S> + & AwsClientBuilder<? extends T, S>> + S createAwsAsyncClient( + final Properties configProps, + final SdkAsyncHttpClient httpClient, + final T clientBuilder, + final String awsUserAgentPrefixFormat, + final String awsClientUserAgentPrefix) { SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build(); - return createKinesisAsyncClient(configProps, clientConfiguration, httpClient); + return createAwsAsyncClient( + configProps, + clientConfiguration, + httpClient, + clientBuilder, + awsUserAgentPrefixFormat, + awsClientUserAgentPrefix); } /** * @param configProps configuration properties * @param clientConfiguration the AWS SDK v2 config to instantiate the client - * @param httpClient the underlying HTTP client used to talk to Kinesis - * @return a new Amazon Kinesis Client + * @param httpClient the underlying HTTP client used to talk to AWS + * @return a new AWS Client */ - public static KinesisAsyncClient createKinesisAsyncClient( - final Properties configProps, - final SdkClientConfiguration clientConfiguration, - final SdkAsyncHttpClient httpClient) { + public static < + S extends SdkClient, + T extends + AwsAsyncClientBuilder<? extends T, S> + & AwsClientBuilder<? extends T, S>> + S createAwsAsyncClient( + final Properties configProps, + final SdkClientConfiguration clientConfiguration, + final SdkAsyncHttpClient httpClient, + final T clientBuilder, + final String awsUserAgentPrefixFormat, + final String awsClientUserAgentPrefix) { String flinkUserAgentPrefix = - Optional.ofNullable( - configProps.getProperty( - AWSKinesisDataStreamsConfigConstants - .KINESIS_CLIENT_USER_AGENT_PREFIX)) - .orElse(formatFlinkUserAgentPrefix(USER_AGENT_FORMAT)); + Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix)) + .orElse( + formatFlinkUserAgentPrefix( + awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX)); final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration( clientConfiguration, ClientOverrideConfiguration.builder(), flinkUserAgentPrefix); - final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder(); - return createKinesisAsyncClient( - configProps, clientBuilder, httpClient, overrideConfiguration); - } - - @VisibleForTesting - static ClientOverrideConfiguration createClientOverrideConfiguration( - final SdkClientConfiguration config, - final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) { - return createClientOverrideConfiguration( - config, - overrideConfigurationBuilder, - formatFlinkUserAgentPrefix(USER_AGENT_FORMAT)); + return createAwsAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration); } @VisibleForTesting @@ -131,11 +137,16 @@ public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil { } @VisibleForTesting - static KinesisAsyncClient createKinesisAsyncClient( - final Properties configProps, - final KinesisAsyncClientBuilder clientBuilder, - final SdkAsyncHttpClient httpClient, - final ClientOverrideConfiguration overrideConfiguration) { + static < + S extends SdkClient, + T extends + AwsAsyncClientBuilder<? extends T, S> + & AwsClientBuilder<? extends T, S>> + S createAwsAsyncClient( + final Properties configProps, + final T clientBuilder, + final SdkAsyncHttpClient httpClient, + final ClientOverrideConfiguration overrideConfiguration) { if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { final URI endpointOverride = @@ -150,10 +161,4 @@ public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil { .region(getRegion(configProps)) .build(); } - - public static boolean isRecoverableException(Exception e) { - Throwable cause = e.getCause(); - return cause instanceof LimitExceededException - || cause instanceof ProvisionedThroughputExceededException; - } } diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java new file mode 100644 index 0000000..543d81b --- /dev/null +++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.testutils; + +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.iam.IamAsyncClient; +import software.amazon.awssdk.services.iam.model.CreateRoleRequest; +import software.amazon.awssdk.services.iam.model.CreateRoleResponse; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.waiters.S3AsyncWaiter; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES; + +/** + * A set of static methods that can be used to call common AWS services on the Localstack container. + */ +public class AWSServicesTestUtils { + + private static final String ACCESS_KEY_ID = "accessKeyId"; + private static final String SECRET_ACCESS_KEY = "secretAccessKey"; + + public static S3AsyncClient getS3Client(String endpoint) throws URISyntaxException { + return S3AsyncClient.builder() + .httpClient(getHttpClient(endpoint)) + .region(Region.AP_SOUTHEAST_1) + .endpointOverride(new URI(endpoint)) + .credentialsProvider(getDefaultCredentials()) + .build(); + } + + public static IamAsyncClient getIamClient(String endpoint) throws URISyntaxException { + return IamAsyncClient.builder() + .httpClient(getHttpClient(endpoint)) + .region(Region.AWS_GLOBAL) + .endpointOverride(new URI(endpoint)) + .credentialsProvider(getDefaultCredentials()) + .build(); + } + + public static AwsCredentialsProvider getDefaultCredentials() { + return StaticCredentialsProvider.create( + AwsBasicCredentials.create(ACCESS_KEY_ID, SECRET_ACCESS_KEY)); + } + + public static Properties getConfig(String endpoint) { + Properties config = new Properties(); + config.setProperty(AWS_REGION, Region.AP_SOUTHEAST_1.toString()); + config.setProperty(AWS_ENDPOINT, endpoint); + config.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), ACCESS_KEY_ID); + config.setProperty( + AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), SECRET_ACCESS_KEY); + config.setProperty(TRUST_ALL_CERTIFICATES, "true"); + return config; + } + + public static SdkAsyncHttpClient getHttpClient(String endpoint) { + return AWSGeneralUtil.createAsyncHttpClient(getConfig(endpoint)); + } + + public static void createBucket(S3AsyncClient s3Client, String bucketName) + throws ExecutionException, InterruptedException { + CreateBucketRequest bucketRequest = + CreateBucketRequest.builder().bucket(bucketName).build(); + s3Client.createBucket(bucketRequest); + + HeadBucketRequest bucketRequestWait = + HeadBucketRequest.builder().bucket(bucketName).build(); + + S3AsyncWaiter s3Waiter = s3Client.waiter(); + CompletableFuture<WaiterResponse<HeadBucketResponse>> waiterResponseFuture = + s3Waiter.waitUntilBucketExists(bucketRequestWait); + + waiterResponseFuture.get(); + } + + public static void createIAMRole(IamAsyncClient iam, String roleName) + throws ExecutionException, InterruptedException { + CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build(); + + CompletableFuture<CreateRoleResponse> responseFuture = iam.createRole(request); + responseFuture.get(); + } + + public static List<S3Object> listBucketObjects(S3AsyncClient s3, String bucketName) + throws ExecutionException, InterruptedException { + ListObjectsRequest listObjects = ListObjectsRequest.builder().bucket(bucketName).build(); + CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects); + return res.get().contents(); + } +} diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java new file mode 100644 index 0000000..bdc0517 --- /dev/null +++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.testutils; + +import org.rnorth.ducttape.ratelimits.RateLimiter; +import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.net.URISyntaxException; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * A class wrapping the Localstack container that provides mock implementations of many common AWS + * services. + */ +public class LocalstackContainer extends GenericContainer<LocalstackContainer> { + + private static final int CONTAINER_PORT = 4566; + + public LocalstackContainer(DockerImageName imageName) { + super(imageName); + withExposedPorts(CONTAINER_PORT); + waitingFor(new ListBucketObjectsWaitStrategy()); + } + + public String getEndpoint() { + return String.format("https://%s:%s", getHost(), getMappedPort(CONTAINER_PORT)); + } + + private class ListBucketObjectsWaitStrategy extends AbstractWaitStrategy { + private static final int TRANSACTIONS_PER_SECOND = 1; + + private final RateLimiter rateLimiter = + RateLimiterBuilder.newBuilder() + .withRate(TRANSACTIONS_PER_SECOND, SECONDS) + .withConstantThroughput() + .build(); + + @Override + protected void waitUntilReady() { + Unreliables.retryUntilSuccess( + (int) startupTimeout.getSeconds(), + SECONDS, + () -> rateLimiter.getWhenReady(this::list)); + } + + private List<S3Object> list() + throws ExecutionException, InterruptedException, URISyntaxException { + String bucketName = "bucket-name-not-to-be-used"; + S3AsyncClient client = AWSServicesTestUtils.getS3Client(getEndpoint()); + AWSServicesTestUtils.createBucket(client, bucketName); + return AWSServicesTestUtils.listBucketObjects(client, bucketName); + } + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java similarity index 62% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java rename to flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java index 0d6eae8..ccecdb8 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java +++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java @@ -15,13 +15,15 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.util; - -import org.apache.flink.connector.aws.util.TestUtil; -import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants; +package org.apache.flink.connector.aws.util; import org.junit.Test; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.core.SdkClient; +import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; @@ -29,18 +31,14 @@ import software.amazon.awssdk.core.client.config.SdkClientOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; -import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import java.net.URI; import java.time.Duration; import java.util.Properties; -import java.util.concurrent.ExecutionException; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.apache.flink.connector.aws.util.AWSAsyncSinkUtil.formatFlinkUserAgentPrefix; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; @@ -48,20 +46,23 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -/** Tests for {@link AWSKinesisDataStreamsUtil}. */ -public class AWSKinesisDataStreamsUtilTest { +/** Tests for {@link AWSAsyncSinkUtil}. */ +public class AWSAsyncSinkUtilTest { + private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT = - AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " V2"; + "Apache Flink %s (%s) *Destination* Connector"; + private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 = + "Apache Flink %s (%s) *Destination* Connector V2"; @Test public void testCreateKinesisAsyncClient() { Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2"); - KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder(); + MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder(); ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder().build(); SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build(); - AWSKinesisDataStreamsUtil.createKinesisAsyncClient( + AWSAsyncSinkUtil.createAwsAsyncClient( properties, builder, httpClient, clientOverrideConfiguration); verify(builder).overrideConfiguration(clientOverrideConfiguration); @@ -77,12 +78,12 @@ public class AWSKinesisDataStreamsUtilTest { Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2"); properties.setProperty(AWS_ENDPOINT, "https://localhost"); - KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder(); + MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder(); ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder().build(); SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build(); - AWSKinesisDataStreamsUtil.createKinesisAsyncClient( + AWSAsyncSinkUtil.createAwsAsyncClient( properties, builder, httpClient, clientOverrideConfiguration); verify(builder).endpointOverride(URI.create("https://localhost")); @@ -94,14 +95,17 @@ public class AWSKinesisDataStreamsUtilTest { ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder(); - AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder); + AWSAsyncSinkUtil.createClientOverrideConfiguration( + clientConfiguration, + builder, + formatFlinkUserAgentPrefix( + DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX)); verify(builder).build(); verify(builder) .putAdvancedOption( SdkAdvancedClientOption.USER_AGENT_PREFIX, - AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix( - DEFAULT_USER_AGENT_PREFIX_FORMAT)); + formatFlinkUserAgentPrefix(DEFAULT_USER_AGENT_PREFIX_FORMAT_V2)); verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, null); verify(builder, never()).apiCallAttemptTimeout(any()); verify(builder, never()).apiCallTimeout(any()); @@ -116,7 +120,11 @@ public class AWSKinesisDataStreamsUtilTest { ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder(); - AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder); + AWSAsyncSinkUtil.createClientOverrideConfiguration( + clientConfiguration, + builder, + formatFlinkUserAgentPrefix( + DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX)); verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix"); } @@ -130,7 +138,12 @@ public class AWSKinesisDataStreamsUtilTest { ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder(); - AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder); + AWSAsyncSinkUtil.createClientOverrideConfiguration( + clientConfiguration, + builder, + formatFlinkUserAgentPrefix( + DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 + + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX)); verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500)); } @@ -144,49 +157,18 @@ public class AWSKinesisDataStreamsUtilTest { ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder(); - AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder); + AWSAsyncSinkUtil.createClientOverrideConfiguration( + clientConfiguration, + builder, + formatFlinkUserAgentPrefix( + DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 + + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX)); verify(builder).apiCallTimeout(Duration.ofMillis(600)); } - @Test - public void testIsRecoverableExceptionForRecoverable() { - Exception recoverable = LimitExceededException.builder().build(); - assertTrue( - AWSKinesisDataStreamsUtil.isRecoverableException( - new ExecutionException(recoverable))); - } - - @Test - public void testIsRecoverableExceptionForNonRecoverable() { - Exception nonRecoverable = new IllegalArgumentException("abc"); - assertFalse( - AWSKinesisDataStreamsUtil.isRecoverableException( - new ExecutionException(nonRecoverable))); - } - - @Test - public void testIsRecoverableExceptionForRuntimeExceptionWrappingRecoverable() { - Exception recoverable = LimitExceededException.builder().build(); - Exception runtime = new RuntimeException("abc", recoverable); - assertTrue(AWSKinesisDataStreamsUtil.isRecoverableException(runtime)); - } - - @Test - public void testIsRecoverableExceptionForRuntimeExceptionWrappingNonRecoverable() { - Exception nonRecoverable = new IllegalArgumentException("abc"); - Exception runtime = new RuntimeException("abc", nonRecoverable); - assertFalse(AWSKinesisDataStreamsUtil.isRecoverableException(runtime)); - } - - @Test - public void testIsRecoverableExceptionForNullCause() { - Exception nonRecoverable = new IllegalArgumentException("abc"); - assertFalse(AWSKinesisDataStreamsUtil.isRecoverableException(nonRecoverable)); - } - - private KinesisAsyncClientBuilder mockKinesisAsyncClientBuilder() { - KinesisAsyncClientBuilder builder = mock(KinesisAsyncClientBuilder.class); + private MockAsyncClientBuilder mockKinesisAsyncClientBuilder() { + MockAsyncClientBuilder builder = mock(MockAsyncClientBuilder.class); when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class))) .thenReturn(builder); when(builder.httpClient(any())).thenReturn(builder); @@ -205,4 +187,52 @@ public class AWSKinesisDataStreamsUtilTest { return builder; } + + private static class MockAsyncClientBuilder + implements AwsAsyncClientBuilder<MockAsyncClientBuilder, SdkClient>, + AwsClientBuilder<MockAsyncClientBuilder, SdkClient> { + + @Override + public MockAsyncClientBuilder asyncConfiguration( + ClientAsyncConfiguration clientAsyncConfiguration) { + return null; + } + + @Override + public MockAsyncClientBuilder httpClient(SdkAsyncHttpClient sdkAsyncHttpClient) { + return null; + } + + @Override + public MockAsyncClientBuilder httpClientBuilder(SdkAsyncHttpClient.Builder builder) { + return null; + } + + @Override + public MockAsyncClientBuilder credentialsProvider( + AwsCredentialsProvider awsCredentialsProvider) { + return null; + } + + @Override + public MockAsyncClientBuilder region(Region region) { + return null; + } + + @Override + public MockAsyncClientBuilder overrideConfiguration( + ClientOverrideConfiguration clientOverrideConfiguration) { + return null; + } + + @Override + public MockAsyncClientBuilder endpointOverride(URI uri) { + return null; + } + + @Override + public SdkClient build() { + return null; + } + } } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml b/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml index 235fc3c..be50a2e 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml @@ -70,18 +70,6 @@ under the License. <version>${aws.sdk.version}</version> </dependency> - <dependency> - <groupId>software.amazon.awssdk</groupId> - <artifactId>netty-nio-client</artifactId> - <version>${aws.sdk.version}</version> - </dependency> - - <dependency> - <groupId>software.amazon.awssdk</groupId> - <artifactId>sts</artifactId> - <version>${aws.sdk.version}</version> - </dependency> - <!--Table API dependencies--> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java similarity index 80% copy from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java copy to flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java index b318292..a5a6020 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.config; +package org.apache.flink.connector.kinesis.sink; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; -/** Defaults for {@link AWSKinesisDataStreamsUtil}. */ +/** Defaults for {@link KinesisDataStreamsSinkWriter}. */ @PublicEvolving -public class AWSKinesisDataStreamsConfigConstants { +public class KinesisDataStreamsConfigConstants { public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; - /** Identifier for user agent prefix. */ + /** Kinesis identifier for user agent prefix. */ public static final String KINESIS_CLIENT_USER_AGENT_PREFIX = "aws.kinesis.client.user-agent-prefix"; } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java index b720e5e..b291c12 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java @@ -18,10 +18,10 @@ package org.apache.flink.connector.kinesis.sink; import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.ElementConverter; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -104,8 +104,12 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe final SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties); - return AWSKinesisDataStreamsUtil.createKinesisAsyncClient( - kinesisClientProperties, httpClient); + return AWSAsyncSinkUtil.createAwsAsyncClient( + kinesisClientProperties, + httpClient, + KinesisAsyncClient.builder(), + KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, + KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); } @Override diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties index c5339e7..c4fa187 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties +++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = INFO +rootLogger.level = OFF rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml similarity index 83% copy from flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml copy to flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml index 235fc3c..d391616 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml @@ -30,8 +30,8 @@ under the License. <relativePath>..</relativePath> </parent> - <artifactId>flink-connector-aws-kinesis-data-streams</artifactId> - <name>Flink : Connectors : AWS Kinesis Data Streams</name> + <artifactId>flink-connector-aws-kinesis-firehose</artifactId> + <name>Flink : Connectors : AWS Kinesis Data Firehose</name> <properties> <aws.sdk.version>2.17.52</aws.sdk.version> </properties> @@ -66,7 +66,7 @@ under the License. <dependency> <groupId>software.amazon.awssdk</groupId> - <artifactId>kinesis</artifactId> + <artifactId>firehose</artifactId> <version>${aws.sdk.version}</version> </dependency> @@ -76,19 +76,6 @@ under the License. <version>${aws.sdk.version}</version> </dependency> - <dependency> - <groupId>software.amazon.awssdk</groupId> - <artifactId>sts</artifactId> - <version>${aws.sdk.version}</version> - </dependency> - - <!--Table API dependencies--> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${project.version}</version> - </dependency> - <!-- Test dependencies --> <dependency> <groupId>org.apache.flink</groupId> @@ -105,28 +92,34 @@ under the License. <scope>test</scope> </dependency> - <!-- Kinesis table factory testing --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> + <artifactId>flink-connector-base</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>testcontainers</artifactId> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <version>${aws.sdk.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>iam</artifactId> + <version>${aws.sdk.version}</version> <scope>test</scope> </dependency> - </dependencies> + </dependencies> <build> <plugins> diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java similarity index 62% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java rename to flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java index b318292..527f74a 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.config; +package org.apache.flink.connector.firehose.sink; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; -/** Defaults for {@link AWSKinesisDataStreamsUtil}. */ +/** Defaults for {@link KinesisFirehoseSinkWriter}. */ @PublicEvolving -public class AWSKinesisDataStreamsConfigConstants { +public class KinesisFirehoseConfigConstants { - public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT = - "Apache Flink %s (%s) Kinesis Connector"; + public static final String BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT = + "Apache Flink %s (%s) Firehose Connector"; - /** Identifier for user agent prefix. */ - public static final String KINESIS_CLIENT_USER_AGENT_PREFIX = - "aws.kinesis.client.user-agent-prefix"; + /** Firehose identifier for user agent prefix. */ + public static final String FIREHOSE_CLIENT_USER_AGENT_PREFIX = + "aws.firehose.client.user-agent-prefix"; } diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java new file mode 100644 index 0000000..ff6918c --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data + * Firehose Sink. + */ +@PublicEvolving +class KinesisFirehoseException extends RuntimeException { + + public KinesisFirehoseException(final String message) { + super(message); + } + + public KinesisFirehoseException(final String message, final Throwable cause) { + super(message, cause); + } + + /** + * When the flag {@code failOnError} is set in {@link KinesisFirehoseSinkWriter}, this exception + * is raised as soon as any exception occurs when writing to KDF. + */ + static class KinesisFirehoseFailFastException extends KinesisFirehoseException { + + public KinesisFirehoseFailFastException() { + super( + "Encountered an exception while persisting records, not retrying due to {failOnError} being set."); + } + + public KinesisFirehoseFailFastException(final Throwable cause) { + super( + "Encountered an exception while persisting records, not retrying due to {failOnError} being set.", + cause); + } + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java new file mode 100644 index 0000000..50f04d4 --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.firehose.model.Record; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +/** + * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery + * stream using the buffering protocol specified in {@link AsyncSinkBase}. + * + * <p>The sink internally uses a {@link + * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS + * endpoint. + * + * <p>Please see the writer implementation in {@link KinesisFirehoseSinkWriter} + * + * @param <InputT> Type of the elements handled by this sink + */ +@PublicEvolving +public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> { + + private final boolean failOnError; + private final String deliveryStreamName; + private final Properties firehoseClientProperties; + + KinesisFirehoseSink( + ElementConverter<InputT, Record> elementConverter, + Integer maxBatchSize, + Integer maxInFlightRequests, + Integer maxBufferedRequests, + Long maxBatchSizeInBytes, + Long maxTimeInBufferMS, + Long maxRecordSizeInBytes, + boolean failOnError, + String deliveryStreamName, + Properties firehoseClientProperties) { + super( + elementConverter, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes); + this.deliveryStreamName = + Preconditions.checkNotNull( + deliveryStreamName, + "The delivery stream name must not be null when initializing the KDF Sink."); + Preconditions.checkArgument( + !this.deliveryStreamName.isEmpty(), + "The delivery stream name must be set when initializing the KDF Sink."); + this.failOnError = failOnError; + this.firehoseClientProperties = firehoseClientProperties; + } + + /** + * Create a {@link KinesisFirehoseSinkBuilder} to allow the fluent construction of a new {@code + * KinesisFirehoseSink}. + * + * @param <InputT> type of incoming records + * @return {@link KinesisFirehoseSinkBuilder} + */ + public static <InputT> KinesisFirehoseSinkBuilder<InputT> builder() { + return new KinesisFirehoseSinkBuilder<>(); + } + + @Override + public SinkWriter<InputT, Void, Collection<Record>> createWriter( + InitContext context, List<Collection<Record>> states) { + return new KinesisFirehoseSinkWriter<>( + getElementConverter(), + context, + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + failOnError, + deliveryStreamName, + firehoseClientProperties); + } + + @Override + public Optional<SimpleVersionedSerializer<Collection<Record>>> getWriterStateSerializer() { + return Optional.empty(); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java new file mode 100644 index 0000000..ee22e1f --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.firehose.model.Record; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link KinesisFirehoseSink}. + * + * <p>The following example shows the minimum setup to create a {@link KinesisFirehoseSink} that + * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name. + * + * <pre>{@code + * private static final KinesisFirehoseSinkElementConverter<String> elementConverter = + * KinesisFirehoseSinkElementConverter.<String>builder() + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * KinesisFirehoseSink<String> kdfSink = + * KinesisFirehoseSink.<String>builder() + * .setElementConverter(elementConverter) + * .setDeliveryStreamName("delivery-stream-name") + * .setMaxBatchSize(20) + * .setFirehoseClientProperties(sinkProperties) + * .build(); + * }</pre> + * + * <p>If the following parameters are not set in this builder, the following defaults will be used: + * + * <ul> + * <li>{@code maxBatchSize} will be 500 + * <li>{@code maxInFlightRequests} will be 50 + * <li>{@code maxBufferedRequests} will be 10000 + * <li>{@code maxBatchSizeInBytes} will be 4 MB i.e. {@code 4 * 1024 * 1024} + * <li>{@code maxTimeInBufferMS} will be 5000ms + * <li>{@code maxRecordSizeInBytes} will be 1000 KB i.e. {@code 1000 * 1024} + * <li>{@code failOnError} will be false + * </ul> + * + * @param <InputT> type of elements that should be persisted in the destination + */ +@PublicEvolving +public class KinesisFirehoseSinkBuilder<InputT> + extends AsyncSinkBaseBuilder<InputT, Record, KinesisFirehoseSinkBuilder<InputT>> { + + private static final int DEFAULT_MAX_BATCH_SIZE = 500; + private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000; + private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024; + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024; + private static final boolean DEFAULT_FAIL_ON_ERROR = false; + private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1; + + private Boolean failOnError; + private String deliveryStreamName; + private Properties firehoseClientProperties; + + KinesisFirehoseSinkBuilder() {} + + /** + * Sets the name of the KDF delivery stream that the sink will connect to. There is no default + * for this parameter, therefore, this must be provided at sink creation time otherwise the + * build will fail. + * + * @param deliveryStreamName the name of the delivery stream + * @return {@link KinesisFirehoseSinkBuilder} itself + */ + public KinesisFirehoseSinkBuilder<InputT> setDeliveryStreamName(String deliveryStreamName) { + this.deliveryStreamName = deliveryStreamName; + return this; + } + + /** + * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the + * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set. + * + * @param failOnError whether to fail on error + * @return {@link KinesisFirehoseSinkBuilder} itself + */ + public KinesisFirehoseSinkBuilder<InputT> setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + return this; + } + + /** + * A set of properties used by the sink to create the firehose client. This may be used to set + * the aws region, credentials etc. See the docs for usage and syntax. + * + * @param firehoseClientProperties Firehose client properties + * @return {@link KinesisFirehoseSinkBuilder} itself + */ + public KinesisFirehoseSinkBuilder<InputT> setFirehoseClientProperties( + Properties firehoseClientProperties) { + this.firehoseClientProperties = firehoseClientProperties; + return this; + } + + private Properties getClientPropertiesWithDefaultHttpProtocol() { + Properties clientProperties = + Optional.ofNullable(firehoseClientProperties).orElse(new Properties()); + clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL); + return clientProperties; + } + + @Override + public KinesisFirehoseSink<InputT> build() { + return new KinesisFirehoseSink<>( + getElementConverter(), + Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE), + Optional.ofNullable(getMaxInFlightRequests()) + .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS), + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS), + Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B), + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), + Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B), + Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR), + deliveryStreamName, + getClientPropertiesWithDefaultHttpProtocol()); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java new file mode 100644 index 0000000..45b4186 --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.firehose.model.Record; + +/** + * An implementation of the {@link ElementConverter} that uses the AWS Kinesis SDK v2. The user only + * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a + * {@link Record} that may be persisted. + */ +@PublicEvolving +public class KinesisFirehoseSinkElementConverter<InputT> + implements ElementConverter<InputT, Record> { + + /** A serialization schema to specify how the input element should be serialized. */ + private final SerializationSchema<InputT> serializationSchema; + + private KinesisFirehoseSinkElementConverter(SerializationSchema<InputT> serializationSchema) { + this.serializationSchema = serializationSchema; + } + + @Override + public Record apply(InputT element, SinkWriter.Context context) { + return Record.builder() + .data(SdkBytes.fromByteArray(serializationSchema.serialize(element))) + .build(); + } + + public static <InputT> Builder<InputT> builder() { + return new Builder<>(); + } + + /** A builder for the KinesisFirehoseSinkElementConverter. */ + @PublicEvolving + public static class Builder<InputT> { + + private SerializationSchema<InputT> serializationSchema; + + public Builder<InputT> setSerializationSchema( + SerializationSchema<InputT> serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + public KinesisFirehoseSinkElementConverter<InputT> build() { + Preconditions.checkNotNull( + serializationSchema, + "No SerializationSchema was supplied to the " + + "KinesisFirehoseSinkElementConverter builder."); + return new KinesisFirehoseSinkElementConverter<>(serializationSchema); + } + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java similarity index 54% copy from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java copy to flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java index b720e5e..4002ec3 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java @@ -15,27 +15,29 @@ * limitations under the License. */ -package org.apache.flink.connector.kinesis.sink; +package org.apache.flink.connector.firehose.sink; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.ElementConverter; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; -import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; -import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; -import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest; +import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse; +import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry; +import software.amazon.awssdk.services.firehose.model.Record; +import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -44,35 +46,36 @@ import java.util.concurrent.CompletionException; import java.util.function.Consumer; /** - * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More + * Sink writer created by {@link KinesisFirehoseSink} to write to Kinesis Data Firehose. More * details on the operation of this sink writer may be found in the doc for {@link - * KinesisDataStreamsSink}. More details on the internals of this sink writer may be found in {@link + * KinesisFirehoseSink}. More details on the internals of this sink writer may be found in {@link * AsyncSinkWriter}. * - * <p>The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK - * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code * AWS_SECRET_ACCESS_KEY} through environment variables etc. */ -class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { - private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); +@Internal +class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> { + private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class); /* A counter for the total number of records that have encountered an error during put */ private final Counter numRecordsOutErrorsCounter; - /* Name of the stream in Kinesis Data Streams */ - private final String streamName; + /* Name of the delivery stream in Kinesis Data Firehose */ + private final String deliveryStreamName; /* The sink writer metric group */ private final SinkWriterMetricGroup metrics; - /* The asynchronous Kinesis client - construction is by kinesisClientProperties */ - private final KinesisAsyncClient client; + /* The asynchronous Firehose client - construction is by firehoseClientProperties */ + private final FirehoseAsyncClient client; /* Flag to whether fatally fail any time we encounter an exception when persisting records */ private final boolean failOnError; - KinesisDataStreamsSinkWriter( - ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, + KinesisFirehoseSinkWriter( + ElementConverter<InputT, Record> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, @@ -81,8 +84,8 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, - String streamName, - Properties kinesisClientProperties) { + String deliveryStreamName, + Properties firehoseClientProperties) { super( elementConverter, context, @@ -93,38 +96,43 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe maxTimeInBufferMS, maxRecordSizeInBytes); this.failOnError = failOnError; - this.streamName = streamName; + this.deliveryStreamName = deliveryStreamName; this.metrics = context.metricGroup(); this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); - this.client = buildClient(kinesisClientProperties); + this.client = buildClient(firehoseClientProperties); } - private KinesisAsyncClient buildClient(Properties kinesisClientProperties) { - + private FirehoseAsyncClient buildClient(Properties firehoseClientProperties) { final SdkAsyncHttpClient httpClient = - AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties); - - return AWSKinesisDataStreamsUtil.createKinesisAsyncClient( - kinesisClientProperties, httpClient); + AWSGeneralUtil.createAsyncHttpClient(firehoseClientProperties); + + return AWSAsyncSinkUtil.createAwsAsyncClient( + firehoseClientProperties, + httpClient, + FirehoseAsyncClient.builder(), + KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT, + KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX); } @Override protected void submitRequestEntries( - List<PutRecordsRequestEntry> requestEntries, - Consumer<List<PutRecordsRequestEntry>> requestResult) { + List<Record> requestEntries, Consumer<Collection<Record>> requestResult) { - PutRecordsRequest batchRequest = - PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build(); + PutRecordBatchRequest batchRequest = + PutRecordBatchRequest.builder() + .records(requestEntries) + .deliveryStreamName(deliveryStreamName) + .build(); - LOG.trace("Request to submit {} entries to KDS using KDS Sink.", requestEntries.size()); + LOG.trace("Request to submit {} entries to KDF using KDF Sink.", requestEntries.size()); - CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); + CompletableFuture<PutRecordBatchResponse> future = client.putRecordBatch(batchRequest); future.whenComplete( (response, err) -> { if (err != null) { handleFullyFailedRequest(err, requestEntries, requestResult); - } else if (response.failedRecordCount() > 0) { + } else if (response.failedPutCount() > 0) { handlePartiallyFailedRequest(response, requestEntries, requestResult); } else { requestResult.accept(Collections.emptyList()); @@ -133,15 +141,19 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe } @Override - protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) { + protected long getSizeInBytes(Record requestEntry) { return requestEntry.data().asByteArrayUnsafe().length; } private void handleFullyFailedRequest( Throwable err, - List<PutRecordsRequestEntry> requestEntries, - Consumer<List<PutRecordsRequestEntry>> requestResult) { - LOG.warn("KDS Sink failed to persist {} entries to KDS", requestEntries.size(), err); + List<Record> requestEntries, + Consumer<Collection<Record>> requestResult) { + LOG.warn( + "KDF Sink failed to persist {} entries to KDF first request was {}", + requestEntries.size(), + requestEntries.get(0).toString(), + err); numRecordsOutErrorsCounter.inc(requestEntries.size()); if (isRetryable(err)) { @@ -150,20 +162,22 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe } private void handlePartiallyFailedRequest( - PutRecordsResponse response, - List<PutRecordsRequestEntry> requestEntries, - Consumer<List<PutRecordsRequestEntry>> requestResult) { - LOG.warn("KDS Sink failed to persist {} entries to KDS", response.failedRecordCount()); - numRecordsOutErrorsCounter.inc(response.failedRecordCount()); + PutRecordBatchResponse response, + List<Record> requestEntries, + Consumer<Collection<Record>> requestResult) { + LOG.warn( + "KDF Sink failed to persist {} entries to KDF first request was {}", + requestEntries.size(), + requestEntries.get(0).toString()); + numRecordsOutErrorsCounter.inc(response.failedPutCount()); if (failOnError) { getFatalExceptionCons() - .accept(new KinesisDataStreamsException.KinesisDataStreamsFailFastException()); + .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException()); return; } - List<PutRecordsRequestEntry> failedRequestEntries = - new ArrayList<>(response.failedRecordCount()); - List<PutRecordsResultEntry> records = response.records(); + List<Record> failedRequestEntries = new ArrayList<>(response.failedPutCount()); + List<PutRecordBatchResponseEntry> records = response.requestResponses(); for (int i = 0; i < records.size(); i++) { if (records.get(i).errorCode() != null) { @@ -179,15 +193,13 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe && err.getCause() instanceof ResourceNotFoundException) { getFatalExceptionCons() .accept( - new KinesisDataStreamsException( + new KinesisFirehoseException( "Encountered non-recoverable exception", err)); return false; } if (failOnError) { getFatalExceptionCons() - .accept( - new KinesisDataStreamsException.KinesisDataStreamsFailFastException( - err)); + .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err)); return false; } diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties new file mode 100644 index 0000000..c64a340 --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = OFF +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java new file mode 100644 index 0000000..0dc85da --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.firehose.model.Record; + +/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */ +public class KinesisFirehoseSinkBuilderTest { + private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER = + KinesisFirehoseSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .build(); + + @Test + public void elementConverterOfSinkMustBeSetWhenBuilt() { + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy( + () -> + KinesisFirehoseSink.builder() + .setDeliveryStreamName("deliveryStream") + .build()) + .withMessageContaining( + "ElementConverter must be not null when initializing the AsyncSinkBase."); + } + + @Test + public void streamNameOfSinkMustBeSetWhenBuilt() { + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy( + () -> + KinesisFirehoseSink.<String>builder() + .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER) + .build()) + .withMessageContaining( + "The delivery stream name must not be null when initializing the KDF Sink."); + } + + @Test + public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + KinesisFirehoseSink.<String>builder() + .setDeliveryStreamName("") + .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER) + .build()) + .withMessageContaining( + "The delivery stream name must be set when initializing the KDF Sink."); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java new file mode 100644 index 0000000..ed0b1c7 --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.firehose.model.Record; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */ +public class KinesisFirehoseSinkElementConverterTest { + + @Test + public void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() { + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build()) + .withMessageContaining( + "No SerializationSchema was supplied to the KinesisFirehoseSinkElementConverter builder."); + } + + @Test + public void elementConverterUsesProvidedSchemaToSerializeRecord() { + ElementConverter<String, Record> elementConverter = + KinesisFirehoseSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .build(); + + String testString = "{many hands make light work;"; + + Record serializedRecord = elementConverter.apply(testString, null); + byte[] serializedString = (new SimpleStringSchema()).serialize(testString); + assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString)); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java new file mode 100644 index 0000000..08cb49b --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.firehose.model.Record; +import software.amazon.awssdk.services.iam.IamAsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.utils.ImmutableMap; + +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getConfig; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream; +import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */ +public class KinesisFirehoseSinkITCase { + + private static final ElementConverter<String, Record> elementConverter = + KinesisFirehoseSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .build(); + + private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class); + private S3AsyncClient s3AsyncClient; + private FirehoseAsyncClient firehoseAsyncClient; + private IamAsyncClient iamAsyncClient; + + private static final String ROLE_NAME = "super-role"; + private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME; + private static final String BUCKET_NAME = "s3-firehose"; + private static final String STREAM_NAME = "s3-stream"; + private static final int NUMBER_OF_ELEMENTS = 92; + + @ClassRule + public static LocalstackContainer mockFirehoseContainer = + new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)); + + @Before + public void setup() throws Exception { + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + s3AsyncClient = getS3Client(mockFirehoseContainer.getEndpoint()); + firehoseAsyncClient = getFirehoseClient(mockFirehoseContainer.getEndpoint()); + iamAsyncClient = getIamClient(mockFirehoseContainer.getEndpoint()); + } + + @After + public void teardown() { + System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); + } + + @Test + public void test() throws Exception { + LOG.info("1 - Creating the bucket for Firehose to deliver into..."); + createBucket(s3AsyncClient, BUCKET_NAME); + LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket..."); + createIAMRole(iamAsyncClient, ROLE_NAME); + LOG.info("3 - Creating the Firehose delivery stream..."); + createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, firehoseAsyncClient); + + ObjectMapper mapper = new ObjectMapper(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> generator = + env.fromSequence(1, NUMBER_OF_ELEMENTS) + .map(Object::toString) + .returns(String.class) + .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); + + KinesisFirehoseSink<String> kdsSink = + KinesisFirehoseSink.<String>builder() + .setElementConverter(elementConverter) + .setDeliveryStreamName(STREAM_NAME) + .setMaxBatchSize(1) + .setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint())) + .build(); + + generator.sinkTo(kdsSink); + env.execute("Integration Test"); + + List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME); + assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java new file mode 100644 index 0000000..164ec39 --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.firehose.model.Record; + +import java.util.Properties; + +/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSink}. */ +public class KinesisFirehoseSinkTest { + + private static final ElementConverter<String, Record> elementConverter = + KinesisFirehoseSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .build(); + + @Test + public void deliveryStreamNameMustNotBeNull() { + Assertions.assertThatExceptionOfType(NullPointerException.class) + .isThrownBy( + () -> + new KinesisFirehoseSink<>( + elementConverter, + 500, + 16, + 10000, + 4 * 1024 * 1024L, + 5000L, + 1000 * 1024L, + false, + null, + new Properties())) + .withMessageContaining( + "The delivery stream name must not be null when initializing the KDF Sink."); + } + + @Test + public void deliveryStreamNameMustNotBeEmpty() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + new KinesisFirehoseSink<>( + elementConverter, + 500, + 16, + 10000, + 4 * 1024 * 1024L, + 5000L, + 1000 * 1024L, + false, + "", + new Properties())) + .withMessageContaining( + "The delivery stream name must be set when initializing the KDF Sink."); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java new file mode 100644 index 0000000..d840033 --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; + +import org.junit.Before; +import org.junit.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.firehose.model.Record; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkWriter}. */ +public class KinesisFirehoseSinkWriterTest { + + private KinesisFirehoseSinkWriter<String> sinkWriter; + + private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER = + KinesisFirehoseSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .build(); + + @Before + public void setup() { + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); + Properties sinkProperties = new Properties(); + sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + sinkWriter = + new KinesisFirehoseSinkWriter<>( + ELEMENT_CONVERTER_PLACEHOLDER, + sinkInitContext, + 50, + 16, + 10000, + 4 * 1024 * 1024, + 5000, + 1000 * 1024, + true, + "streamName", + sinkProperties); + } + + @Test + public void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() { + String testString = "{many hands make light work;"; + Record record = Record.builder().data(SdkBytes.fromUtf8String(testString)).build(); + assertThat(sinkWriter.getSizeInBytes(record)) + .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length); + } + + @Test + public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures() + throws IOException, InterruptedException { + Properties prop = new Properties(); + prop.setProperty(AWSConfigConstants.AWS_REGION, Region.EU_WEST_1.toString()); + prop.setProperty(AWS_ENDPOINT, "https://fake_aws_endpoint"); + TestSinkInitContext ctx = new TestSinkInitContext(); + KinesisFirehoseSink<String> kinesisFirehoseSink = + new KinesisFirehoseSink<>( + ELEMENT_CONVERTER_PLACEHOLDER, + 6, + 16, + 10000, + 4 * 1024 * 1024L, + 5000L, + 1000 * 1024L, + true, + "test-stream", + prop); + SinkWriter<String, Void, Collection<Record>> writer = + kinesisFirehoseSink.createWriter(ctx, new ArrayList<>()); + + for (int i = 0; i < 12; i++) { + writer.write("data_bytes", null); + } + + assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java new file mode 100644 index 0000000..3e9d0ee --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink.examples; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.utils.ImmutableMap; + +import java.util.Properties; + +/** + * An example application demonstrating how to use the {@link KinesisFirehoseSink} to sink into KDF. + * + * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY} + * through environment variables etc. + */ +public class SinkIntoFirehose { + + private static final KinesisFirehoseSinkElementConverter<String> elementConverter = + KinesisFirehoseSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .build(); + + public static void main(String[] args) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10_000); + + DataStream<String> generator = + env.fromSequence(1, 10_000_000L) + .map(Object::toString) + .returns(String.class) + .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); + + Properties sinkProperties = new Properties(); + sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + + KinesisFirehoseSink<String> kdfSink = + KinesisFirehoseSink.<String>builder() + .setElementConverter(elementConverter) + .setDeliveryStreamName("delivery-stream") + .setMaxBatchSize(20) + .setFirehoseClientProperties(sinkProperties) + .build(); + + generator.sinkTo(kdfSink); + + env.execute("KDF Async Sink Example Program"); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java new file mode 100644 index 0000000..1de389d --- /dev/null +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.firehose.sink.testutils; + +import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; +import org.apache.flink.connector.firehose.sink.KinesisFirehoseConfigConstants; + +import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; +import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse; +import software.amazon.awssdk.services.firehose.model.DeliveryStreamType; +import software.amazon.awssdk.services.firehose.model.ExtendedS3DestinationConfiguration; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getConfig; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getHttpClient; + +/** + * A set of static methods that can be used to call common AWS services on the Localstack container. + */ +public class KinesisFirehoseTestUtils { + + public static FirehoseAsyncClient getFirehoseClient(String endpoint) throws URISyntaxException { + return AWSAsyncSinkUtil.createAwsAsyncClient( + getConfig(endpoint), + getHttpClient(endpoint), + FirehoseAsyncClient.builder().endpointOverride(new URI(endpoint)), + KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT, + KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX); + } + + public static void createDeliveryStream( + String deliveryStreamName, + String bucketName, + String roleARN, + FirehoseAsyncClient firehoseAsyncClient) + throws ExecutionException, InterruptedException { + ExtendedS3DestinationConfiguration s3Config = + ExtendedS3DestinationConfiguration.builder() + .bucketARN(bucketName) + .roleARN(roleARN) + .build(); + CreateDeliveryStreamRequest request = + CreateDeliveryStreamRequest.builder() + .deliveryStreamName(deliveryStreamName) + .extendedS3DestinationConfiguration(s3Config) + .deliveryStreamType(DeliveryStreamType.DIRECT_PUT) + .build(); + + CompletableFuture<CreateDeliveryStreamResponse> deliveryStream = + firehoseAsyncClient.createDeliveryStream(request); + deliveryStream.get(); + } +} diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties similarity index 97% copy from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties copy to flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties index c5339e7..c4fa187 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = INFO +rootLogger.level = OFF rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger diff --git a/flink-connectors/flink-connector-base/pom.xml b/flink-connectors/flink-connector-base/pom.xml index 751aecd..58fd369 100644 --- a/flink-connectors/flink-connector-base/pom.xml +++ b/flink-connectors/flink-connector-base/pom.xml @@ -72,4 +72,20 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java index b508159..00a08b1 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java @@ -17,22 +17,8 @@ package org.apache.flink.connector.base.sink.writer; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.groups.OperatorIOMetricGroup; -import org.apache.flink.metrics.groups.SinkWriterMetricGroup; -import org.apache.flink.metrics.testutils.MetricListener; -import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; -import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; -import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.util.function.RunnableWithException; -import org.apache.flink.util.function.ThrowingRunnable; import org.junit.Before; import org.junit.Test; @@ -42,10 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,12 +49,12 @@ import static org.junit.jupiter.api.Assertions.fail; public class AsyncSinkWriterTest { private final List<Integer> res = new ArrayList<>(); - private SinkInitContext sinkInitContext; + private TestSinkInitContext sinkInitContext; @Before public void before() { res.clear(); - sinkInitContext = new SinkInitContext(); + sinkInitContext = new TestSinkInitContext(); } private void performNormalWriteOfEightyRecordsToMock() @@ -1008,105 +991,6 @@ public class AsyncSinkWriterTest { } } - private static class SinkInitContext implements Sink.InitContext { - - private static final TestProcessingTimeService processingTimeService; - private final MetricListener metricListener = new MetricListener(); - private final OperatorIOMetricGroup operatorIOMetricGroup = - UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); - private final SinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock( - metricListener.getMetricGroup(), operatorIOMetricGroup); - - static { - processingTimeService = new TestProcessingTimeService(); - } - - @Override - public UserCodeClassLoader getUserCodeClassLoader() { - return null; - } - - @Override - public MailboxExecutor getMailboxExecutor() { - StreamTaskActionExecutor streamTaskActionExecutor = - new StreamTaskActionExecutor() { - @Override - public void run(RunnableWithException e) throws Exception { - e.run(); - } - - @Override - public <E extends Throwable> void runThrowing( - ThrowingRunnable<E> throwingRunnable) throws E { - throwingRunnable.run(); - } - - @Override - public <R> R call(Callable<R> callable) throws Exception { - return callable.call(); - } - }; - return new MailboxExecutorImpl( - new TaskMailboxImpl(Thread.currentThread()), - Integer.MAX_VALUE, - streamTaskActionExecutor); - } - - @Override - public Sink.ProcessingTimeService getProcessingTimeService() { - return new Sink.ProcessingTimeService() { - @Override - public long getCurrentProcessingTime() { - return processingTimeService.getCurrentProcessingTime(); - } - - @Override - public void registerProcessingTimer( - long time, ProcessingTimeCallback processingTimerCallback) { - processingTimeService.registerTimer( - time, processingTimerCallback::onProcessingTime); - } - }; - } - - @Override - public int getSubtaskId() { - return 0; - } - - @Override - public int getNumberOfParallelSubtasks() { - return 0; - } - - @Override - public SinkWriterMetricGroup metricGroup() { - return metricGroup; - } - - @Override - public OptionalLong getRestoredCheckpointId() { - return OptionalLong.empty(); - } - - public TestProcessingTimeService getTestProcessingTimeService() { - return processingTimeService; - } - - private Optional<Gauge<Long>> getCurrentSendTimeGauge() { - return metricListener.getGauge("currentSendTime"); - } - - private Counter getNumRecordsOutCounter() { - return metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); - } - - private Counter getNumBytesOutCounter() { - return metricGroup.getIOMetricGroup().getNumBytesOutCounter(); - } - } - /** * This SinkWriter releases the lock on existing threads blocked by {@code delayedStartLatch} * and blocks itself until {@code blockedThreadLatch} is unblocked. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java new file mode 100644 index 0000000..fba2711 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; +import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; + +/** A mock implementation of a {@code Sink.InitContext} to be used in sink unit tests. */ +public class TestSinkInitContext implements Sink.InitContext { + + private static final TestProcessingTimeService processingTimeService; + private final MetricListener metricListener = new MetricListener(); + private final OperatorIOMetricGroup operatorIOMetricGroup = + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); + private final SinkWriterMetricGroup metricGroup = + InternalSinkWriterMetricGroup.mock( + metricListener.getMetricGroup(), operatorIOMetricGroup); + + static { + processingTimeService = new TestProcessingTimeService(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return null; + } + + @Override + public MailboxExecutor getMailboxExecutor() { + StreamTaskActionExecutor streamTaskActionExecutor = + new StreamTaskActionExecutor() { + @Override + public void run(RunnableWithException e) throws Exception { + e.run(); + } + + @Override + public <E extends Throwable> void runThrowing( + ThrowingRunnable<E> throwingRunnable) throws E { + throwingRunnable.run(); + } + + @Override + public <R> R call(Callable<R> callable) throws Exception { + return callable.call(); + } + }; + return new MailboxExecutorImpl( + new TaskMailboxImpl(Thread.currentThread()), + Integer.MAX_VALUE, + streamTaskActionExecutor); + } + + @Override + public Sink.ProcessingTimeService getProcessingTimeService() { + return new Sink.ProcessingTimeService() { + @Override + public long getCurrentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + @Override + public void registerProcessingTimer( + long time, ProcessingTimeCallback processingTimerCallback) { + processingTimeService.registerTimer( + time, processingTimerCallback::onProcessingTime); + } + }; + } + + @Override + public int getSubtaskId() { + return 0; + } + + @Override + public int getNumberOfParallelSubtasks() { + return 0; + } + + @Override + public SinkWriterMetricGroup metricGroup() { + return metricGroup; + } + + @Override + public OptionalLong getRestoredCheckpointId() { + return OptionalLong.empty(); + } + + public TestProcessingTimeService getTestProcessingTimeService() { + return processingTimeService; + } + + public Optional<Gauge<Long>> getCurrentSendTimeGauge() { + return metricListener.getGauge("currentSendTime"); + } + + public Counter getNumRecordsOutCounter() { + return metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + } + + public Counter getNumBytesOutCounter() { + return metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java index e2e6a79..0677765 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration; +import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -190,7 +190,7 @@ public class KinesisProxyV2 implements KinesisProxyV2Interface { try { response = responseSupplier.get(); } catch (Exception ex) { - if (AWSKinesisDataStreamsUtil.isRecoverableException(ex)) { + if (AwsV2Util.isRecoverableException(ex)) { long backoffMillis = backoff.calculateFullJitterBackoff( jitterBase, jitterMax, jitterExponent, ++attempt); diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java index 03767cd..62f25db 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; -import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; +import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration; import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util; import org.apache.flink.util.Preconditions; @@ -63,13 +63,17 @@ public class KinesisProxyV2Factory { Properties legacyConfigProps = new Properties(configProps); legacyConfigProps.setProperty( - AWSKinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX, - AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix( - AWSKinesisDataStreamsConfigConstants - .BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); + KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX, + AWSAsyncSinkUtil.formatFlinkUserAgentPrefix( + KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); final KinesisAsyncClient client = - AWSKinesisDataStreamsUtil.createKinesisAsyncClient(legacyConfigProps, httpClient); + AWSAsyncSinkUtil.createAwsAsyncClient( + legacyConfigProps, + httpClient, + KinesisAsyncClient.builder(), + KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, + KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); return new KinesisProxyV2(client, httpClient, configuration, BACKOFF); } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index 5c949fe..684234c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kinesis.util; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider; -import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants; -import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; +import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; +import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; @@ -83,9 +83,8 @@ public class AWSUtil { Properties configProps, ClientConfiguration awsClientConfig) { // set a Flink-specific user agent awsClientConfig.setUserAgentPrefix( - AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix( - AWSKinesisDataStreamsConfigConstants - .BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); + AWSAsyncSinkUtil.formatFlinkUserAgentPrefix( + KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); // utilize automatic refreshment of credentials by directly passing the // AWSCredentialsProvider @@ -133,7 +132,7 @@ public class AWSUtil { private static AWSCredentialsProvider getCredentialsProvider( final Properties configProps, final String configPrefix) { CredentialProvider credentialProviderType = - AWSKinesisDataStreamsUtil.getCredentialProviderType(configProps, configPrefix); + AWSAsyncSinkUtil.getCredentialProviderType(configProps, configPrefix); switch (credentialProviderType) { case ENV_VAR: diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java index 609e879..aa62a65 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kinesis.util; import org.apache.flink.annotation.Internal; import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.utils.AttributeMap; import java.time.Duration; @@ -70,4 +72,10 @@ public class AwsV2Util { public static boolean isNoneEfoRegistrationType(final Properties properties) { return NONE.name().equals(properties.get(EFO_REGISTRATION_TYPE)); } + + public static boolean isRecoverableException(Exception e) { + Throwable cause = e.getCause(); + return cause instanceof LimitExceededException + || cause instanceof ProvisionedThroughputExceededException; + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java index 47cf23f..ea9f03d 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java @@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.kinesis.util; import org.junit.Test; import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.utils.AttributeMap; import java.time.Duration; import java.util.Properties; +import java.util.concurrent.ExecutionException; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.LAZY; @@ -131,4 +133,36 @@ public class AwsV2UtilTest { prop.setProperty(EFO_REGISTRATION_TYPE, NONE.name()); assertTrue(AwsV2Util.isNoneEfoRegistrationType(prop)); } + + @Test + public void testIsRecoverableExceptionForRecoverable() { + Exception recoverable = LimitExceededException.builder().build(); + assertTrue(AwsV2Util.isRecoverableException(new ExecutionException(recoverable))); + } + + @Test + public void testIsRecoverableExceptionForNonRecoverable() { + Exception nonRecoverable = new IllegalArgumentException("abc"); + assertFalse(AwsV2Util.isRecoverableException(new ExecutionException(nonRecoverable))); + } + + @Test + public void testIsRecoverableExceptionForRuntimeExceptionWrappingRecoverable() { + Exception recoverable = LimitExceededException.builder().build(); + Exception runtime = new RuntimeException("abc", recoverable); + assertTrue(AwsV2Util.isRecoverableException(runtime)); + } + + @Test + public void testIsRecoverableExceptionForRuntimeExceptionWrappingNonRecoverable() { + Exception nonRecoverable = new IllegalArgumentException("abc"); + Exception runtime = new RuntimeException("abc", nonRecoverable); + assertFalse(AwsV2Util.isRecoverableException(runtime)); + } + + @Test + public void testIsRecoverableExceptionForNullCause() { + Exception nonRecoverable = new IllegalArgumentException("abc"); + assertFalse(AwsV2Util.isRecoverableException(nonRecoverable)); + } } diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 7495412..a4c97b7 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -55,6 +55,7 @@ under the License. <module>flink-connector-aws-base</module> <module>flink-connector-kinesis</module> <module>flink-connector-aws-kinesis-data-streams</module> + <module>flink-connector-aws-kinesis-firehose</module> <module>flink-connector-base</module> <module>flink-file-sink-common</module> <module>flink-connector-files</module> diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java index afc6d9b..99c5293 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java @@ -37,6 +37,8 @@ public class DockerImageVersions { public static final String KINESALITE = "instructure/kinesalite:latest"; + public static final String LOCALSTACK = "localstack/localstack:latest"; + public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh index 7793c0b..d70c9ff 100755 --- a/tools/ci/stage.sh +++ b/tools/ci/stage.sh @@ -109,6 +109,7 @@ flink-connectors/flink-connector-rabbitmq,\ flink-connectors/flink-connector-twitter,\ flink-connectors/flink-connector-kinesis,\ flink-connectors/flink-connector-aws-kinesis-data-streams,\ +flink-connectors/flink-connector-aws-kinesis-firehose,\ flink-metrics/flink-metrics-dropwizard,\ flink-metrics/flink-metrics-graphite,\ flink-metrics/flink-metrics-jmx,\
