dannycranmer commented on a change in pull request #17345:
URL: https://github.com/apache/flink/pull/17345#discussion_r763889383



##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from 
the Kinesis Data
+ * Streams Sink.
+ */
+class KinesisDataStreamsException extends RuntimeException {
+
+    public KinesisDataStreamsException(final String message) {
+        super(message);
+    }
+
+    public KinesisDataStreamsException(final String message, final Throwable 
cause) {
+        super(message, cause);
+    }
+
+    /**
+     * When the flag {@code failOnError} is set in {@link 
KinesisDataStreamsSinkWriter}, this
+     * exception is raised as soon as any exception occurs when KDS is written 
to.
+     */
+    static class KinesisDataStreamsFailFastException extends 
KinesisDataStreamsException {

Review comment:
       nit: The name of this class is not sitting well with me. I think 
something more general like `KinesisDataStreamsRuntimeException` seems like a 
better fit. Thoughts?

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link KinesisDataStreamsSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
KinesisDataStreamsSink} that
+ * writes String values to a Kinesis Data Streams stream named 
your_stream_here.
+ *
+ * <pre>{@code
+ * ElementConverter<String, PutRecordsRequestEntry> elementConverter =
+ *             KinesisDataStreamsSinkElementConverter.<String>builder()
+ *                     .setSerializationSchema(new SimpleStringSchema())
+ *                     .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+ *                     .build();
+ *
+ * KinesisDataStreamsSink<String> kdsSink =
+ *                 KinesisDataStreamsSink.<String>builder()
+ *                         .setElementConverter(elementConverter)
+ *                         .setStreamName("your_stream_name")
+ *                         .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 200
+ *   <li>{@code maxInFlightRequests} will be 16
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code flushOnBufferSizeInBytes} will be 64MB i.e. {@code 64 * 1024 * 
1024}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class KinesisDataStreamsSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<
+                InputT, PutRecordsRequestEntry, 
KinesisDataStreamsSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 500;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 16;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 5 * 1024 * 1024;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1 * 1024 * 1024;
+    private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+
+    private Boolean failOnError;
+    private String streamName;
+    private Properties kinesisClientProperties;
+
+    KinesisDataStreamsSinkBuilder() {}
+
+    /**
+     * Sets the name of the KDS stream that the sink will connect to. There is 
no default for this
+     * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+     * fail.
+     *
+     * @param streamName the name of the stream
+     * @return {@link KinesisDataStreamsSinkBuilder} itself
+     */
+    public KinesisDataStreamsSinkBuilder<InputT> setStreamName(String 
streamName) {
+        this.streamName = streamName;
+        return this;
+    }
+
+    public KinesisDataStreamsSinkBuilder<InputT> setFailOnError(boolean 
failOnError) {
+        this.failOnError = failOnError;
+        return this;
+    }
+
+    public KinesisDataStreamsSinkBuilder<InputT> setKinesisClientProperties(
+            Properties kinesisClientProperties) {
+        this.kinesisClientProperties = kinesisClientProperties;
+        return this;
+    }
+
+    @Override
+    public KinesisDataStreamsSink<InputT> build() {
+        return new KinesisDataStreamsSink<>(
+                getElementConverter(),
+                getMaxBatchSize() == null ? DEFAULT_MAX_BATCH_SIZE : 
getMaxBatchSize(),
+                getMaxInFlightRequests() == null

Review comment:
       nit: You could use optionals here, this would remove duplicate method 
invocations
   
   ```
   
Optional.ofNullable(getMaxInFlightRequests()).orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS)
   ```

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
##########
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/** Configuration keys for AWS service usage. */
-@PublicEvolving
-public class AWSConfigConstants extends 
org.apache.flink.connector.aws.config.AWSConfigConstants {

Review comment:
       I think deleting this is a non-backwards compatible change? We need to 
retain this in the legacy connector?

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link KinesisDataStreamsSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
KinesisDataStreamsSink} that
+ * writes String values to a Kinesis Data Streams stream named 
your_stream_here.
+ *
+ * <pre>{@code
+ * ElementConverter<String, PutRecordsRequestEntry> elementConverter =
+ *             KinesisDataStreamsSinkElementConverter.<String>builder()
+ *                     .setSerializationSchema(new SimpleStringSchema())
+ *                     .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+ *                     .build();
+ *
+ * KinesisDataStreamsSink<String> kdsSink =
+ *                 KinesisDataStreamsSink.<String>builder()
+ *                         .setElementConverter(elementConverter)
+ *                         .setStreamName("your_stream_name")
+ *                         .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 200
+ *   <li>{@code maxInFlightRequests} will be 16
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code flushOnBufferSizeInBytes} will be 64MB i.e. {@code 64 * 1024 * 
1024}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code failOnError} will be false

Review comment:
       This comment is outdated

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+
+/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+@PublicEvolving
+public class AWSKinesisDataStreamsConfigConstants extends AWSConfigConstants {

Review comment:
       I believe the class that extends `AWSConfigConstants` is meant to go in 
the legacy connector for backward compatibility?
   

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AsyncProducerConfigConstants.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+
+/** Optional producer specific configuration keys {@link AsyncSinkWriter}. */
+@PublicEvolving
+public class AsyncProducerConfigConstants extends 
AWSKinesisDataStreamsConfigConstants {
+    public static final String HTTP_CLIENT_MAX_CONCURRENCY =
+            "flink.stream.kinesis.http-client.max-concurrency";
+
+    public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS =
+            "flink.stream.kinesis.http-client.read-timeout";

Review comment:
       These configs look like they should go in the AWS base module?

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.kinesis.config.AsyncProducerConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY;
+import static 
org.apache.flink.connector.kinesis.config.AsyncProducerConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS;
+
+/**
+ * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis 
Data Streams. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * KinesisDataStreamsSink}. More details on the internals of this sink writer 
may be found in {@link
+ * AsyncSinkWriter}.
+ *
+ * <p>The {@link KinesisAsyncClient} used here may be configured in the 
standard way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an 
error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* Name of the stream in Kinesis Data Streams */
+    private final String streamName;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    /* The asynchronous Kinesis client - construction is by 
kinesisClientProperties */
+    private final KinesisAsyncClient client;
+
+    /* Flag to whether fatally fail any time we encounter an exception when 
persisting records */
+    private final boolean failOnError;
+
+    KinesisDataStreamsSinkWriter(
+            ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String streamName,
+            Properties kinesisClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.streamName = streamName;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.client = buildClient(kinesisClientProperties);
+    }
+
+    private KinesisAsyncClient buildClient(Properties kinesisClientProperties) 
{
+        final AttributeMap.Builder clientConfiguration =
+                
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
+
+        
Optional.ofNullable(kinesisClientProperties.getProperty(HTTP_CLIENT_MAX_CONCURRENCY))
+                .map(Integer::parseInt)
+                .ifPresent(
+                        integer ->
+                                clientConfiguration.put(
+                                        
SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));
+
+        
Optional.ofNullable(kinesisClientProperties.getProperty(HTTP_CLIENT_READ_TIMEOUT_MILLIS))
+                .map(Integer::parseInt)
+                .map(Duration::ofMillis)
+                .ifPresent(
+                        timeout ->
+                                clientConfiguration.put(
+                                        
SdkHttpConfigurationOption.READ_TIMEOUT, timeout));
+
+        Optional.ofNullable(
+                        kinesisClientProperties.getProperty(
+                                AWSConfigConstants.TRUST_ALL_CERTIFICATES))
+                .map(Boolean::parseBoolean)
+                .ifPresent(
+                        bool ->
+                                clientConfiguration.put(
+                                        
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));
+
+        Optional.ofNullable(
+                        kinesisClientProperties.getProperty(
+                                AWSConfigConstants.HTTP_PROTOCOL_VERSION))
+                .map(Protocol::valueOf)
+                .ifPresent(
+                        protocol ->
+                                clientConfiguration.put(
+                                        SdkHttpConfigurationOption.PROTOCOL, 
protocol));

Review comment:
       As mentioned above, this bit seems common, not specific to Sink. Can it 
be moved somewhere reusable?

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+/** IT cases for using Kinesis Data Streams Sink based on Kinesalite. */
+public class KinesisDataStreamsSinkITCase extends TestLogger {
+
+    private static final String DEFAULT_FIRST_SHARD_NAME = 
"shardId-000000000000";
+
+    private final ElementConverter<String, PutRecordsRequestEntry> 
elementConverter =
+            KinesisDataStreamsSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+                    .build();
+
+    private final ElementConverter<String, PutRecordsRequestEntry>
+            partitionKeyTooLongElementConverter =
+                    KinesisDataStreamsSinkElementConverter.<String>builder()
+                            .setSerializationSchema(new SimpleStringSchema())
+                            .setPartitionKeyGenerator(element -> element)
+                            .build();
+
+    @ClassRule
+    public static final KinesaliteContainer KINESALITE =
+            new 
KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE))
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases("kinesalite");
+
+    private StreamExecutionEnvironment env;
+    private KinesisAsyncClient kinesisClient;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        kinesisClient = KINESALITE.getHostClient();
+    }
+
+    @Test
+    public void 
elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached()
+            throws Exception {
+        new Scenario()
+                .withKinesaliteStreamName("test-stream-name-1")
+                .withSinkConnectionStreamName("test-stream-name-1")
+                .runScenario();
+    }
+
+    @Test
+    public void 
elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive()
+            throws Exception {
+        new Scenario()
+                .withNumberOfElementsToSend(10)
+                .withMaxBatchSize(100)
+                .withExpectedElements(10)
+                .withKinesaliteStreamName("test-stream-name-2")
+                .withSinkConnectionStreamName("test-stream-name-2")
+                .runScenario();
+    }
+
+    @Test
+    public void veryLargeMessagesSucceedInBeingPersisted() throws Exception {
+        new Scenario()
+                .withNumberOfElementsToSend(5)
+                .withSizeOfMessageBytes(2500)
+                .withMaxBatchSize(10)
+                .withExpectedElements(5)
+                .withKinesaliteStreamName("test-stream-name-3")
+                .withSinkConnectionStreamName("test-stream-name-3")
+                .runScenario();
+    }
+
+    @Test
+    public void 
multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted()
+            throws Exception {
+        new Scenario()
+                .withNumberOfElementsToSend(150)
+                .withSizeOfMessageBytes(2500)
+                .withBufferMaxTimeMS(2000)
+                .withMaxInflightReqs(10)
+                .withMaxBatchSize(20)
+                .withExpectedElements(150)
+                .withKinesaliteStreamName("test-stream-name-4")
+                .withSinkConnectionStreamName("test-stream-name-4")
+                .runScenario();
+    }
+
+    @Test
+    public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() {
+        testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(true, 
"test-stream-name-5");
+    }
+
+    @Test
+    public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() 
{
+        testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(false, 
"test-stream-name-6");
+    }
+
+    @Test
+    public void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
+        Throwable thrown =
+                assertThrows(
+                        JobExecutionException.class,
+                        () ->
+                                new Scenario()
+                                        .withNumberOfElementsToSend(5)
+                                        .withSizeOfMessageBytes(2500)
+                                        .withExpectedElements(5)
+                                        
.withKinesaliteStreamName("test-stream-name-7")
+                                        
.withSinkConnectionStreamName("test-stream-name-7")
+                                        
.withElementConverter(partitionKeyTooLongElementConverter)
+                                        .runScenario());
+        assertEquals(
+                "Encountered an exception while persisting records, not 
retrying due to {failOnError} being set.",
+                thrown.getCause().getCause().getMessage());
+    }
+
+    private class Scenario {
+        private int numberOfElementsToSend = 50;
+        private int sizeOfMessageBytes = 25;
+        private int bufferMaxTimeMS = 1000;
+        private int maxInflightReqs = 1;
+        private int maxBatchSize = 50;
+        private int expectedElements = 50;
+        private boolean failOnError = false;
+        private String kinesaliteStreamName;
+        private String sinkConnectionStreamName;
+        private ElementConverter<String, PutRecordsRequestEntry> 
elementConverter =
+                KinesisDataStreamsSinkITCase.this.elementConverter;
+
+        public void runScenario() throws Exception {
+            prepareStream(kinesaliteStreamName);
+
+            DataStream<String> stream =
+                    env.addSource(
+                                    new DataGeneratorSource<String>(
+                                            
RandomGenerator.stringGenerator(sizeOfMessageBytes),
+                                            100,
+                                            (long) numberOfElementsToSend))
+                            .returns(String.class);
+
+            Properties prop = new Properties();
+            prop.setProperty(AWS_ENDPOINT, KINESALITE.getHostEndpointUrl());
+            prop.setProperty(AWS_ACCESS_KEY_ID, KINESALITE.getAccessKey());
+            prop.setProperty(AWS_SECRET_ACCESS_KEY, KINESALITE.getSecretKey());
+            prop.setProperty(AWS_REGION, KINESALITE.getRegion().toString());
+            prop.setProperty(TRUST_ALL_CERTIFICATES, "true");
+            prop.setProperty(HTTP_PROTOCOL_VERSION, "HTTP1_1");
+
+            KinesisDataStreamsSink<String> kdsSink =
+                    KinesisDataStreamsSink.<String>builder()
+                            .setElementConverter(elementConverter)
+                            .setMaxTimeInBufferMS(bufferMaxTimeMS)
+                            .setMaxInFlightRequests(maxInflightReqs)
+                            .setMaxBatchSize(maxBatchSize)
+                            .setFailOnError(failOnError)
+                            .setMaxBufferedRequests(1000)
+                            .setStreamName(sinkConnectionStreamName)
+                            .setKinesisClientProperties(prop)
+                            .setFailOnError(true)
+                            .build();
+
+            stream.sinkTo(kdsSink);
+
+            env.execute("KDS Async Sink Example Program");
+
+            String shardIterator =
+                    kinesisClient
+                            .getShardIterator(
+                                    GetShardIteratorRequest.builder()
+                                            .shardId(DEFAULT_FIRST_SHARD_NAME)
+                                            
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
+                                            .streamName(kinesaliteStreamName)
+                                            .build())
+                            .get()
+                            .shardIterator();
+
+            assertEquals(

Review comment:
       [We should now be 
using](https://flink.apache.org/contributing/code-style-and-quality-common.html#testing)
 `AssertJ`, please migrate tests, example:
   ```
   Assertions.assertThat(kinesisClient
                               
.getRecords(GetRecordsRequest.builder().shardIterator(shardIterator).build())
                               .get()
                               .records()
                               .size())
                               .isEqualTo(expectedElements);
   ```

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+/** Covers construction, defaults and sanity checking of 
KinesisDataStreamsSinkBuilder. */
+public class KinesisDataStreamsSinkBuilderTest {
+    private static final ElementConverter<String, PutRecordsRequestEntry>
+            ELEMENT_CONVERTER_PLACEHOLDER =
+                    KinesisDataStreamsSinkElementConverter.<String>builder()
+                            .setSerializationSchema(new SimpleStringSchema())
+                            .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+                            .build();
+
+    @Test
+    public void elementConverterOfSinkMustBeSetWhenBuilt() {
+        Throwable thrown =
+                assertThrows(
+                        NullPointerException.class,
+                        () -> 
KinesisDataStreamsSink.builder().setStreamName("stream").build());
+        assertEquals(
+                "ElementConverter must be not null when initilizing the 
AsyncSinkBase.",
+                thrown.getMessage());

Review comment:
       [We should now be 
using](https://flink.apache.org/contributing/code-style-and-quality-common.html#testing)
 `AssertJ`. Can you migrate the tests to assertJ? Example:
   
   ```
   Assertions.assertThatExceptionOfType(NullPointerException.class)
      .isThrownBy(() -> 
KinesisDataStreamsSink.builder().setStreamName("stream").build())
      .withMessageContaining("ElementConverter must be not null when 
initilizing the AsyncSinkBase.");
   ```

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java
##########
@@ -39,8 +39,8 @@
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
-import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
 import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider;

Review comment:
       This change is not backwards compatible

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
##########
@@ -17,24 +17,30 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;

Review comment:
       Can we move this to the new namespace instead of legacy? 

##########
File path: 
flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+
+/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+@PublicEvolving
+public class AWSKinesisDataStreamsConfigConstants extends AWSConfigConstants {
+
+    public static final boolean DEFAULT_LEGACY_CONNECTOR = false;
+
+    /** The identifier of the legacy connector. */
+    public static final String LEGACY_CONNECTOR = "aws.kinesis.legacy";
+}

Review comment:
       Should this go in the legacy connector module instead?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
##########
@@ -47,22 +50,28 @@
     public static KinesisProxyV2Interface createKinesisProxyV2(final 
Properties configProps) {
         Preconditions.checkNotNull(configProps);
 
-        final ClientConfiguration clientConfiguration =
-                new ClientConfigurationFactory().getConfig();
+        final AttributeMap convertedProperties = 
AwsV2Util.convertProperties(configProps);
+        final AttributeMap.Builder clientConfiguration = 
AttributeMap.builder();
         populateDefaultValues(clientConfiguration);
 
         final SdkAsyncHttpClient httpClient =
-                AwsV2Util.createHttpClient(
-                        clientConfiguration, 
NettyNioAsyncHttpClient.builder(), configProps);
+                AWSGeneralUtil.createAsyncHttpClient(
+                        convertedProperties.merge(clientConfiguration.build()),
+                        NettyNioAsyncHttpClient.builder());
         final FanOutRecordPublisherConfiguration configuration =
                 new FanOutRecordPublisherConfiguration(configProps, 
emptyList());
+
+        Properties legacyConfigProps = new Properties(configProps);
+        legacyConfigProps.setProperty(
+                AWSKinesisDataStreamsConfigConstants.LEGACY_CONNECTOR, 
Boolean.toString(true));

Review comment:
       Maybe instead of setting a boolean here, we can just set the user agent 
header? This way we can remove this logic from the new code

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -101,7 +101,7 @@ public static void validateConsumerConfiguration(Properties 
config, List<String>
         }
 
         if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
-                || config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
+                || config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {

Review comment:
       Please make sure this change is backwards compatible, remember that 
customer code can be referencing `ConsumerConfigConstants.AWS_ENDPOINT`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to