dannycranmer commented on a change in pull request #18314:
URL: https://github.com/apache/flink/pull/18314#discussion_r789685805
##########
File path:
flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinksUtil.java
##########
@@ -15,36 +15,32 @@
* limitations under the License.
*/
-package org.apache.flink.connector.kinesis.util;
+package org.apache.flink.connector.aws.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import
org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
-import
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import java.net.URI;
import java.util.Optional;
import java.util.Properties;
/** Some utilities specific to Amazon Web Service. */
@Internal
-public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
+public class AWSAsyncSinksUtil extends AWSGeneralUtil {
Review comment:
I think we should drop the "s" to make singular, `AWSAsyncSinkUtil`
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.examples;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.Properties;
+
+/**
+ * An example application demonstrating how to use the {@link
KinesisFirehoseSink} to sink into KDF.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code
AWS_SECRET_ACCESS_KEY}
+ * through environment variables etc.
+ */
+public class SinkIntoFirehose {
+
+ private static final KinesisFirehoseSinkElementConverter<String>
elementConverter =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ public static void main(String[] args) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10_000);
+
+ DataStream<String> generator =
+ env.fromSequence(1, 10_000_000L)
+ .map(Object::toString)
+ .returns(String.class)
+ .map(data ->
mapper.writeValueAsString(ImmutableMap.of("data", data)));
Review comment:
nit: Why not use a Json serialisation schema instead of map to string?
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.firehose.sink.testutils.LocalstackContainer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.List;
+
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createBucket;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createIAMRole;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getConfig;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getIamClient;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getS3Client;
+import static
org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.listBucketObjects;
+import static org.junit.Assert.assertEquals;
+
+/** Integration test suite for the {@code KinesisFirehoseSink} using a
localstack container. */
+public class KinesisFirehoseSinkITCase {
+
+ private static final ElementConverter<String, Record> elementConverter =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
+ private S3AsyncClient s3AsyncClient;
+ private FirehoseAsyncClient firehoseAsyncClient;
+ private IamAsyncClient iamAsyncClient;
+
+ private static final String ROLE_NAME = "super-role";
+ private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" +
ROLE_NAME;
+ private static final String BUCKET_NAME = "s3-firehose";
+ private static final String STREAM_NAME = "s3-stream";
+ private static final int NUMBER_OF_ELEMENTS = 92;
+
+ @ClassRule
+ public static LocalstackContainer mockFirehoseContainer =
+ new
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
+
+ @Before
+ public void setup() throws Exception {
+ System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+ s3AsyncClient = getS3Client(mockFirehoseContainer.getEndpoint());
+ firehoseAsyncClient =
getFirehoseClient(mockFirehoseContainer.getEndpoint());
+ iamAsyncClient = getIamClient(mockFirehoseContainer.getEndpoint());
+ }
+
+ @After
+ public void teardown() {
+ System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+ }
+
+ @Test
+ public void test() throws Exception {
+ LOG.info("1 - Creating the bucket for Firehose to deliver into...");
+ createBucket(s3AsyncClient, BUCKET_NAME);
+ LOG.info("2 - Creating the IAM Role for Firehose to write into the s3
bucket...");
+ createIAMRole(iamAsyncClient, ROLE_NAME);
+ LOG.info("3 - Creating the Firehose delivery stream...");
+ createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN,
firehoseAsyncClient);
+
+ ObjectMapper mapper = new ObjectMapper();
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> generator =
+ env.fromSequence(1, NUMBER_OF_ELEMENTS)
+ .map(Object::toString)
+ .returns(String.class)
+ .map(data ->
mapper.writeValueAsString(ImmutableMap.of("data", data)));
+
+ KinesisFirehoseSink<String> kdsSink =
+ KinesisFirehoseSink.<String>builder()
+ .setElementConverter(elementConverter)
+ .setDeliveryStreamName(STREAM_NAME)
+ .setMaxBatchSize(1)
+
.setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint()))
+ .build();
+
+ generator.sinkTo(kdsSink);
+ env.execute("Integration Test");
+
+ List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME);
+ assertEquals(NUMBER_OF_ELEMENTS, objects.size());
Review comment:
Please update this assertion to use assertJ
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.Before;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+
+import static
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Covers construction, defaults and sanity checking of {@link
KinesisFirehoseSinkWriter}. */
+public class KinesisFirehoseSinkWriterTest {
+
+ private KinesisFirehoseSinkWriter<String> sinkWriter;
+
+ private static final ElementConverter<String, Record>
ELEMENT_CONVERTER_PLACEHOLDER =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @Before
+ public void setup() {
+ TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+ Properties sinkProperties = new Properties();
+ sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ sinkWriter =
+ new KinesisFirehoseSinkWriter<>(
+ ELEMENT_CONVERTER_PLACEHOLDER,
+ sinkInitContext,
+ 50,
+ 16,
+ 10000,
+ 4 * 1024 * 1024,
+ 5000,
+ 1000 * 1024,
+ true,
+ "streamName",
+ sinkProperties);
+ }
+
+ @Test
+ public void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+ String testString = "{many hands make light work;";
+ Record record =
Record.builder().data(SdkBytes.fromUtf8String(testString)).build();
+ assertEquals(
+ testString.getBytes(StandardCharsets.US_ASCII).length,
+ sinkWriter.getSizeInBytes(record));
Review comment:
Change to assertJ and other instances
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/LocalstackContainer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock
implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer>
{
Review comment:
Can this go in the AWS connector base?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]