This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9050fd56591cbcb3270dd40ea370ea605bda8d39 Author: Zichen Liu <[email protected]> AuthorDate: Mon Jan 24 18:55:17 2022 +0000 [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase --- .../docs/connectors/datastream/firehose.md | 56 +++++++--------- .../content/docs/connectors/datastream/firehose.md | 56 +++++++--------- .../aws/testutils/AWSServicesTestUtils.java | 44 +++++++++++++ .../firehose/sink/KinesisFirehoseSinkBuilder.java | 27 ++++++-- .../sink/KinesisFirehoseSinkElementConverter.java | 8 +-- .../firehose/sink/KinesisFirehoseSinkWriter.java | 9 +-- .../sink/KinesisFirehoseSinkBuilderTest.java | 16 ++--- .../KinesisFirehoseSinkElementConverterTest.java | 2 +- .../firehose/sink/KinesisFirehoseSinkITCase.java | 31 +++++---- .../firehose/sink/examples/SinkIntoFirehose.java | 74 ---------------------- 10 files changed, 143 insertions(+), 180 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md index ecc77ce..e958a7c 100644 --- a/docs/content.zh/docs/connectors/datastream/firehose.md +++ b/docs/content.zh/docs/connectors/datastream/firehose.md @@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}} {{< tab "Java" >}} ```java -KinesisFirehoseSinkElementConverter<String> elementConverter = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build(); - Properties sinkProperties = new Properties(); // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); @@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_ KinesisFirehoseSink<String> kdfSink = KinesisFirehoseSink.<String>builder() - .setFirehoseClientProperties(sinkProperties) // Required - .setElementConverter(elementConverter) // Required - .setDeliveryStreamName("your-stream-name") // Required - .setFailOnError(false) // Optional - .setMaxBatchSize(500) // Optional - .setMaxInFlightRequests(50) // Optional - .setMaxBufferedRequests(10_000) // Optional - .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional - .setMaxTimeInBufferMS(5000) // Optional - .setMaxRecordSizeInBytes(1000 * 1024) // Optional + .setFirehoseClientProperties(sinkProperties) // Required + .setSerializationSchema(new SimpleStringSchema()) // Required + .setDeliveryStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1000 * 1024) // Optional .build(); flinkStream.sinkTo(kdfSink); @@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink); {{< /tab >}} {{< tab "Scala" >}} ```scala -val elementConverter = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build() - Properties sinkProperties = new Properties() // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") @@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_ val kdfSink = KinesisFirehoseSink.<String>builder() - .setFirehoseClientProperties(sinkProperties) // Required - .setElementConverter(elementConverter) // Required - .setDeliveryStreamName("your-stream-name") // Required - .setFailOnError(false) // Optional - .setMaxBatchSize(500) // Optional - .setMaxInFlightRequests(50) // Optional - .setMaxBufferedRequests(10_000) // Optional - .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional - .setMaxTimeInBufferMS(5000) // Optional - .setMaxRecordSizeInBytes(1000 * 1024) // Optional + .setFirehoseClientProperties(sinkProperties) // Required + .setSerializationSchema(new SimpleStringSchema()) // Required + .setDeliveryStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1000 * 1024) // Optional .build() flinkStream.sinkTo(kdfSink) @@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink) ## Configurations -Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`. +Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`. 1. __setFirehoseClientProperties(Properties sinkProperties)__ * Required. * Supplies credentials, region and other parameters to the Firehose client. -2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__ +2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__ * Required. - * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example. + * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose. 3. __setDeliveryStreamName(String deliveryStreamName)__ * Required. * Name of the delivery stream to sink to. diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md index b224665..20beb61 100644 --- a/docs/content/docs/connectors/datastream/firehose.md +++ b/docs/content/docs/connectors/datastream/firehose.md @@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}} {{< tab "Java" >}} ```java -KinesisFirehoseSinkElementConverter<String> elementConverter = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build(); - Properties sinkProperties = new Properties(); // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); @@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_ KinesisFirehoseSink<String> kdfSink = KinesisFirehoseSink.<String>builder() - .setFirehoseClientProperties(sinkProperties) // Required - .setElementConverter(elementConverter) // Required - .setDeliveryStreamName("your-stream-name") // Required - .setFailOnError(false) // Optional - .setMaxBatchSize(500) // Optional - .setMaxInFlightRequests(50) // Optional - .setMaxBufferedRequests(10_000) // Optional - .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional - .setMaxTimeInBufferMS(5000) // Optional - .setMaxRecordSizeInBytes(1000 * 1024) // Optional + .setFirehoseClientProperties(sinkProperties) // Required + .setSerializationSchema(new SimpleStringSchema()) // Required + .setDeliveryStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1000 * 1024) // Optional .build(); flinkStream.sinkTo(kdfSink); @@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink); {{< /tab >}} {{< tab "Scala" >}} ```scala -val elementConverter = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build() - Properties sinkProperties = new Properties() // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") @@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_ val kdfSink = KinesisFirehoseSink.<String>builder() - .setFirehoseClientProperties(sinkProperties) // Required - .setElementConverter(elementConverter) // Required - .setDeliveryStreamName("your-stream-name") // Required - .setFailOnError(false) // Optional - .setMaxBatchSize(500) // Optional - .setMaxInFlightRequests(50) // Optional - .setMaxBufferedRequests(10_000) // Optional - .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional - .setMaxTimeInBufferMS(5000) // Optional - .setMaxRecordSizeInBytes(1000 * 1024) // Optional + .setFirehoseClientProperties(sinkProperties) // Required + .setSerializationSchema(new SimpleStringSchema()) // Required + .setDeliveryStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1000 * 1024) // Optional .build() flinkStream.sinkTo(kdfSink) @@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink) ## Configurations -Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`. +Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`. 1. __setFirehoseClientProperties(Properties sinkProperties)__ * Required. * Supplies credentials, region and other parameters to the Firehose client. -2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__ +2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__ * Required. - * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example. + * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose. 3. __setDeliveryStreamName(String deliveryStreamName)__ * Required. * Name of the delivery stream to sink to. diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java index 543d81b..ff915fa 100644 --- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java +++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java @@ -23,6 +23,8 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; @@ -31,6 +33,8 @@ import software.amazon.awssdk.services.iam.model.CreateRoleRequest; import software.amazon.awssdk.services.iam.model.CreateRoleResponse; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.s3.model.HeadBucketResponse; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; @@ -44,6 +48,8 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; @@ -126,4 +132,42 @@ public class AWSServicesTestUtils { CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects); return res.get().contents(); } + + public static <T> List<T> readObjectsFromS3Bucket( + S3AsyncClient s3AsyncClient, + List<S3Object> objects, + String bucketName, + Function<ResponseBytes<GetObjectResponse>, T> deserializer) { + S3BucketReader bucketReader = new S3BucketReader(s3AsyncClient, bucketName); + return bucketReader.readObjects(objects, deserializer); + } + + /** Helper class to read objects from S3. */ + private static class S3BucketReader { + private final S3AsyncClient s3AsyncClient; + private final String bucketName; + + public S3BucketReader(S3AsyncClient s3AsyncClient, String bucketName) { + this.s3AsyncClient = s3AsyncClient; + this.bucketName = bucketName; + } + + public <T> List<T> readObjects( + List<S3Object> objectList, + Function<ResponseBytes<GetObjectResponse>, T> deserializer) { + return objectList.stream() + .map(object -> readObjectWitKey(object.key(), deserializer)) + .collect(Collectors.toList()); + } + + public <T> T readObjectWitKey( + String key, Function<ResponseBytes<GetObjectResponse>, T> deserializer) { + GetObjectRequest getObjectRequest = + GetObjectRequest.builder().bucket(bucketName).key(key).build(); + return s3AsyncClient + .getObject(getObjectRequest, AsyncResponseTransformer.toBytes()) + .thenApply(deserializer) + .join(); + } + } } diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java index ee22e1f..a180abc 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.firehose.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import software.amazon.awssdk.http.Protocol; @@ -36,11 +37,6 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1; * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name. * * <pre>{@code - * private static final KinesisFirehoseSinkElementConverter<String> elementConverter = - * KinesisFirehoseSinkElementConverter.<String>builder() - * .setSerializationSchema(new SimpleStringSchema()) - * .build(); - * * Properties sinkProperties = new Properties(); * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); * @@ -50,6 +46,7 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1; * .setDeliveryStreamName("delivery-stream-name") * .setMaxBatchSize(20) * .setFirehoseClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) * .build(); * }</pre> * @@ -73,7 +70,7 @@ public class KinesisFirehoseSinkBuilder<InputT> private static final int DEFAULT_MAX_BATCH_SIZE = 500; private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; - private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000; private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024; private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024; @@ -83,6 +80,7 @@ public class KinesisFirehoseSinkBuilder<InputT> private Boolean failOnError; private String deliveryStreamName; private Properties firehoseClientProperties; + private SerializationSchema<InputT> serializationSchema; KinesisFirehoseSinkBuilder() {} @@ -100,6 +98,19 @@ public class KinesisFirehoseSinkBuilder<InputT> } /** + * Allows the user to specify a serialization schema to serialize each record to persist to + * Firehose. + * + * @param serializationSchema serialization schema to use + * @return {@link KinesisFirehoseSinkBuilder} itself + */ + public KinesisFirehoseSinkBuilder<InputT> setSerializationSchema( + SerializationSchema<InputT> serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + /** * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set. * @@ -134,7 +145,9 @@ public class KinesisFirehoseSinkBuilder<InputT> @Override public KinesisFirehoseSink<InputT> build() { return new KinesisFirehoseSink<>( - getElementConverter(), + KinesisFirehoseSinkElementConverter.<InputT>builder() + .setSerializationSchema(serializationSchema) + .build(), Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE), Optional.ofNullable(getMaxInFlightRequests()) .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS), diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java index 45b4186..cca749c 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java @@ -17,7 +17,7 @@ package org.apache.flink.connector.firehose.sink; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -31,7 +31,7 @@ import software.amazon.awssdk.services.firehose.model.Record; * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a * {@link Record} that may be persisted. */ -@PublicEvolving +@Internal public class KinesisFirehoseSinkElementConverter<InputT> implements ElementConverter<InputT, Record> { @@ -54,7 +54,6 @@ public class KinesisFirehoseSinkElementConverter<InputT> } /** A builder for the KinesisFirehoseSinkElementConverter. */ - @PublicEvolving public static class Builder<InputT> { private SerializationSchema<InputT> serializationSchema; @@ -68,8 +67,7 @@ public class KinesisFirehoseSinkElementConverter<InputT> public KinesisFirehoseSinkElementConverter<InputT> build() { Preconditions.checkNotNull( serializationSchema, - "No SerializationSchema was supplied to the " - + "KinesisFirehoseSinkElementConverter builder."); + "No SerializationSchema was supplied to the " + "KinesisFirehoseSink builder."); return new KinesisFirehoseSinkElementConverter<>(serializationSchema); } } diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java index 4002ec3..b41103f 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java @@ -37,7 +37,6 @@ import software.amazon.awssdk.services.firehose.model.Record; import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -116,7 +115,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> @Override protected void submitRequestEntries( - List<Record> requestEntries, Consumer<Collection<Record>> requestResult) { + List<Record> requestEntries, Consumer<List<Record>> requestResult) { PutRecordBatchRequest batchRequest = PutRecordBatchRequest.builder() @@ -146,9 +145,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> } private void handleFullyFailedRequest( - Throwable err, - List<Record> requestEntries, - Consumer<Collection<Record>> requestResult) { + Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) { LOG.warn( "KDF Sink failed to persist {} entries to KDF first request was {}", requestEntries.size(), @@ -164,7 +161,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> private void handlePartiallyFailedRequest( PutRecordBatchResponse response, List<Record> requestEntries, - Consumer<Collection<Record>> requestResult) { + Consumer<List<Record>> requestResult) { LOG.warn( "KDF Sink failed to persist {} entries to KDF first request was {}", requestEntries.size(), diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java index 0dc85da..88f4329 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java @@ -17,19 +17,17 @@ package org.apache.flink.connector.firehose.sink; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.assertj.core.api.Assertions; import org.junit.Test; -import software.amazon.awssdk.services.firehose.model.Record; /** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */ public class KinesisFirehoseSinkBuilderTest { - private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build(); + + private static final SerializationSchema<String> SERIALIZATION_SCHEMA = + new SimpleStringSchema(); @Test public void elementConverterOfSinkMustBeSetWhenBuilt() { @@ -40,7 +38,7 @@ public class KinesisFirehoseSinkBuilderTest { .setDeliveryStreamName("deliveryStream") .build()) .withMessageContaining( - "ElementConverter must be not null when initializing the AsyncSinkBase."); + "No SerializationSchema was supplied to the KinesisFirehoseSink builder."); } @Test @@ -49,7 +47,7 @@ public class KinesisFirehoseSinkBuilderTest { .isThrownBy( () -> KinesisFirehoseSink.<String>builder() - .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER) + .setSerializationSchema(SERIALIZATION_SCHEMA) .build()) .withMessageContaining( "The delivery stream name must not be null when initializing the KDF Sink."); @@ -62,7 +60,7 @@ public class KinesisFirehoseSinkBuilderTest { () -> KinesisFirehoseSink.<String>builder() .setDeliveryStreamName("") - .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER) + .setSerializationSchema(SERIALIZATION_SCHEMA) .build()) .withMessageContaining( "The delivery stream name must be set when initializing the KDF Sink."); diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java index ed0b1c7..221f444 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java @@ -35,7 +35,7 @@ public class KinesisFirehoseSinkElementConverterTest { Assertions.assertThatExceptionOfType(NullPointerException.class) .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build()) .withMessageContaining( - "No SerializationSchema was supplied to the KinesisFirehoseSinkElementConverter builder."); + "No SerializationSchema was supplied to the KinesisFirehoseSink builder."); } @Test diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java index 08cb49b..8809437 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.connector.firehose.sink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.aws.testutils.LocalstackContainer; -import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.DockerImageVersions; @@ -35,12 +34,12 @@ import org.slf4j.LoggerFactory; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.core.SdkSystemSetting; import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; -import software.amazon.awssdk.services.firehose.model.Record; import software.amazon.awssdk.services.iam.IamAsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.utils.ImmutableMap; +import java.util.ArrayList; import java.util.List; import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket; @@ -49,6 +48,7 @@ import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getC import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient; import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client; import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects; +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket; import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream; import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient; import static org.assertj.core.api.Assertions.assertThat; @@ -56,22 +56,17 @@ import static org.assertj.core.api.Assertions.assertThat; /** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */ public class KinesisFirehoseSinkITCase { - private static final ElementConverter<String, Record> elementConverter = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build(); - private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class); - private S3AsyncClient s3AsyncClient; - private FirehoseAsyncClient firehoseAsyncClient; - private IamAsyncClient iamAsyncClient; - private static final String ROLE_NAME = "super-role"; private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME; private static final String BUCKET_NAME = "s3-firehose"; private static final String STREAM_NAME = "s3-stream"; private static final int NUMBER_OF_ELEMENTS = 92; + private S3AsyncClient s3AsyncClient; + private FirehoseAsyncClient firehoseAsyncClient; + private IamAsyncClient iamAsyncClient; + @ClassRule public static LocalstackContainer mockFirehoseContainer = new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)); @@ -106,10 +101,15 @@ public class KinesisFirehoseSinkITCase { .map(Object::toString) .returns(String.class) .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); + List<String> expectedElements = new ArrayList<>(); + for (int i = 1; i < NUMBER_OF_ELEMENTS; i++) { + expectedElements.add( + mapper.writeValueAsString(ImmutableMap.of("data", String.valueOf(i)))); + } KinesisFirehoseSink<String> kdsSink = KinesisFirehoseSink.<String>builder() - .setElementConverter(elementConverter) + .setSerializationSchema(new SimpleStringSchema()) .setDeliveryStreamName(STREAM_NAME) .setMaxBatchSize(1) .setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint())) @@ -120,5 +120,12 @@ public class KinesisFirehoseSinkITCase { List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME); assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS); + assertThat( + readObjectsFromS3Bucket( + s3AsyncClient, + objects, + BUCKET_NAME, + response -> new String(response.asByteArrayUnsafe()))) + .containsAll(expectedElements); } } diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java deleted file mode 100644 index 3e9d0ee..0000000 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.firehose.sink.examples; - -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.connector.aws.config.AWSConfigConstants; -import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink; -import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import software.amazon.awssdk.services.firehose.FirehoseAsyncClient; -import software.amazon.awssdk.utils.ImmutableMap; - -import java.util.Properties; - -/** - * An example application demonstrating how to use the {@link KinesisFirehoseSink} to sink into KDF. - * - * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS - * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY} - * through environment variables etc. - */ -public class SinkIntoFirehose { - - private static final KinesisFirehoseSinkElementConverter<String> elementConverter = - KinesisFirehoseSinkElementConverter.<String>builder() - .setSerializationSchema(new SimpleStringSchema()) - .build(); - - public static void main(String[] args) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(10_000); - - DataStream<String> generator = - env.fromSequence(1, 10_000_000L) - .map(Object::toString) - .returns(String.class) - .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); - - Properties sinkProperties = new Properties(); - sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); - - KinesisFirehoseSink<String> kdfSink = - KinesisFirehoseSink.<String>builder() - .setElementConverter(elementConverter) - .setDeliveryStreamName("delivery-stream") - .setMaxBatchSize(20) - .setFirehoseClientProperties(sinkProperties) - .build(); - - generator.sinkTo(kdfSink); - - env.execute("KDF Async Sink Example Program"); - } -}
