dannycranmer commented on a change in pull request #17345: URL: https://github.com/apache/flink/pull/17345#discussion_r763889383
########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +/** + * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data + * Streams Sink. + */ +class KinesisDataStreamsException extends RuntimeException { + + public KinesisDataStreamsException(final String message) { + super(message); + } + + public KinesisDataStreamsException(final String message, final Throwable cause) { + super(message, cause); + } + + /** + * When the flag {@code failOnError} is set in {@link KinesisDataStreamsSinkWriter}, this + * exception is raised as soon as any exception occurs when KDS is written to. + */ + static class KinesisDataStreamsFailFastException extends KinesisDataStreamsException { Review comment: nit: The name of this class is not sitting well with me. I think something more general like `KinesisDataStreamsRuntimeException` seems like a better fit. Thoughts? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; + +import java.util.Properties; + +/** + * Builder to construct {@link KinesisDataStreamsSink}. + * + * <p>The following example shows the minimum setup to create a {@link KinesisDataStreamsSink} that + * writes String values to a Kinesis Data Streams stream named your_stream_here. + * + * <pre>{@code + * ElementConverter<String, PutRecordsRequestEntry> elementConverter = + * KinesisDataStreamsSinkElementConverter.<String>builder() + * .setSerializationSchema(new SimpleStringSchema()) + * .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + * .build(); + * + * KinesisDataStreamsSink<String> kdsSink = + * KinesisDataStreamsSink.<String>builder() + * .setElementConverter(elementConverter) + * .setStreamName("your_stream_name") + * .build(); + * }</pre> + * + * <p>If the following parameters are not set in this builder, the following defaults will be used: + * + * <ul> + * <li>{@code maxBatchSize} will be 200 + * <li>{@code maxInFlightRequests} will be 16 + * <li>{@code maxBufferedRequests} will be 10000 + * <li>{@code flushOnBufferSizeInBytes} will be 64MB i.e. {@code 64 * 1024 * 1024} + * <li>{@code maxTimeInBufferMS} will be 5000ms + * <li>{@code failOnError} will be false + * </ul> + * + * @param <InputT> type of elements that should be persisted in the destination + */ +@PublicEvolving +public class KinesisDataStreamsSinkBuilder<InputT> + extends AsyncSinkBaseBuilder< + InputT, PutRecordsRequestEntry, KinesisDataStreamsSinkBuilder<InputT>> { + + private static final int DEFAULT_MAX_BATCH_SIZE = 500; + private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 16; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000; + private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 5 * 1024 * 1024; + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1 * 1024 * 1024; + private static final boolean DEFAULT_FAIL_ON_ERROR = false; + + private Boolean failOnError; + private String streamName; + private Properties kinesisClientProperties; + + KinesisDataStreamsSinkBuilder() {} + + /** + * Sets the name of the KDS stream that the sink will connect to. There is no default for this + * parameter, therefore, this must be provided at sink creation time otherwise the build will + * fail. + * + * @param streamName the name of the stream + * @return {@link KinesisDataStreamsSinkBuilder} itself + */ + public KinesisDataStreamsSinkBuilder<InputT> setStreamName(String streamName) { + this.streamName = streamName; + return this; + } + + public KinesisDataStreamsSinkBuilder<InputT> setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + return this; + } + + public KinesisDataStreamsSinkBuilder<InputT> setKinesisClientProperties( + Properties kinesisClientProperties) { + this.kinesisClientProperties = kinesisClientProperties; + return this; + } + + @Override + public KinesisDataStreamsSink<InputT> build() { + return new KinesisDataStreamsSink<>( + getElementConverter(), + getMaxBatchSize() == null ? DEFAULT_MAX_BATCH_SIZE : getMaxBatchSize(), + getMaxInFlightRequests() == null Review comment: nit: You could use optionals here, this would remove duplicate method invocations ``` Optional.ofNullable(getMaxInFlightRequests()).orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS) ``` ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ########## @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.config; - -import org.apache.flink.annotation.PublicEvolving; - -/** Configuration keys for AWS service usage. */ -@PublicEvolving -public class AWSConfigConstants extends org.apache.flink.connector.aws.config.AWSConfigConstants { Review comment: I think deleting this is a non-backwards compatible change? We need to retain this in the legacy connector? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; + +import java.util.Properties; + +/** + * Builder to construct {@link KinesisDataStreamsSink}. + * + * <p>The following example shows the minimum setup to create a {@link KinesisDataStreamsSink} that + * writes String values to a Kinesis Data Streams stream named your_stream_here. + * + * <pre>{@code + * ElementConverter<String, PutRecordsRequestEntry> elementConverter = + * KinesisDataStreamsSinkElementConverter.<String>builder() + * .setSerializationSchema(new SimpleStringSchema()) + * .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + * .build(); + * + * KinesisDataStreamsSink<String> kdsSink = + * KinesisDataStreamsSink.<String>builder() + * .setElementConverter(elementConverter) + * .setStreamName("your_stream_name") + * .build(); + * }</pre> + * + * <p>If the following parameters are not set in this builder, the following defaults will be used: + * + * <ul> + * <li>{@code maxBatchSize} will be 200 + * <li>{@code maxInFlightRequests} will be 16 + * <li>{@code maxBufferedRequests} will be 10000 + * <li>{@code flushOnBufferSizeInBytes} will be 64MB i.e. {@code 64 * 1024 * 1024} + * <li>{@code maxTimeInBufferMS} will be 5000ms + * <li>{@code failOnError} will be false Review comment: This comment is outdated ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; + +/** Defaults for {@link AWSKinesisDataStreamsUtil}. */ +@PublicEvolving +public class AWSKinesisDataStreamsConfigConstants extends AWSConfigConstants { Review comment: I believe the class that extends `AWSConfigConstants` is meant to go in the legacy connector for backward compatibility? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AsyncProducerConfigConstants.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; + +/** Optional producer specific configuration keys {@link AsyncSinkWriter}. */ +@PublicEvolving +public class AsyncProducerConfigConstants extends AWSKinesisDataStreamsConfigConstants { + public static final String HTTP_CLIENT_MAX_CONCURRENCY = + "flink.stream.kinesis.http-client.max-concurrency"; + + public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = + "flink.stream.kinesis.http-client.read-timeout"; Review comment: These configs look like they should go in the AWS base module? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.utils.AttributeMap; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +import static org.apache.flink.connector.kinesis.config.AsyncProducerConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY; +import static org.apache.flink.connector.kinesis.config.AsyncProducerConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS; + +/** + * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More + * details on the operation of this sink writer may be found in the doc for {@link + * KinesisDataStreamsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * <p>The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + + /* A counter for the total number of records that have encountered an error during put */ + private final Counter numRecordsOutErrorsCounter; + + /* Name of the stream in Kinesis Data Streams */ + private final String streamName; + + /* The sink writer metric group */ + private final SinkWriterMetricGroup metrics; + + /* The asynchronous Kinesis client - construction is by kinesisClientProperties */ + private final KinesisAsyncClient client; + + /* Flag to whether fatally fail any time we encounter an exception when persisting records */ + private final boolean failOnError; + + KinesisDataStreamsSinkWriter( + ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + boolean failOnError, + String streamName, + Properties kinesisClientProperties) { + super( + elementConverter, + context, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes); + this.failOnError = failOnError; + this.streamName = streamName; + this.metrics = context.metricGroup(); + this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); + this.client = buildClient(kinesisClientProperties); + } + + private KinesisAsyncClient buildClient(Properties kinesisClientProperties) { + final AttributeMap.Builder clientConfiguration = + AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true); + + Optional.ofNullable(kinesisClientProperties.getProperty(HTTP_CLIENT_MAX_CONCURRENCY)) + .map(Integer::parseInt) + .ifPresent( + integer -> + clientConfiguration.put( + SdkHttpConfigurationOption.MAX_CONNECTIONS, integer)); + + Optional.ofNullable(kinesisClientProperties.getProperty(HTTP_CLIENT_READ_TIMEOUT_MILLIS)) + .map(Integer::parseInt) + .map(Duration::ofMillis) + .ifPresent( + timeout -> + clientConfiguration.put( + SdkHttpConfigurationOption.READ_TIMEOUT, timeout)); + + Optional.ofNullable( + kinesisClientProperties.getProperty( + AWSConfigConstants.TRUST_ALL_CERTIFICATES)) + .map(Boolean::parseBoolean) + .ifPresent( + bool -> + clientConfiguration.put( + SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool)); + + Optional.ofNullable( + kinesisClientProperties.getProperty( + AWSConfigConstants.HTTP_PROTOCOL_VERSION)) + .map(Protocol::valueOf) + .ifPresent( + protocol -> + clientConfiguration.put( + SdkHttpConfigurationOption.PROTOCOL, protocol)); Review comment: As mentioned above, this bit seems common, not specific to Sink. Can it be moved somewhere reusable? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; +import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; + +import java.time.Duration; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +/** IT cases for using Kinesis Data Streams Sink based on Kinesalite. */ +public class KinesisDataStreamsSinkITCase extends TestLogger { + + private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000"; + + private final ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + + private final ElementConverter<String, PutRecordsRequestEntry> + partitionKeyTooLongElementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> element) + .build(); + + @ClassRule + public static final KinesaliteContainer KINESALITE = + new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE)) + .withNetwork(Network.newNetwork()) + .withNetworkAliases("kinesalite"); + + private StreamExecutionEnvironment env; + private KinesisAsyncClient kinesisClient; + + @Before + public void setUp() throws Exception { + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + kinesisClient = KINESALITE.getHostClient(); + } + + @Test + public void elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached() + throws Exception { + new Scenario() + .withKinesaliteStreamName("test-stream-name-1") + .withSinkConnectionStreamName("test-stream-name-1") + .runScenario(); + } + + @Test + public void elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive() + throws Exception { + new Scenario() + .withNumberOfElementsToSend(10) + .withMaxBatchSize(100) + .withExpectedElements(10) + .withKinesaliteStreamName("test-stream-name-2") + .withSinkConnectionStreamName("test-stream-name-2") + .runScenario(); + } + + @Test + public void veryLargeMessagesSucceedInBeingPersisted() throws Exception { + new Scenario() + .withNumberOfElementsToSend(5) + .withSizeOfMessageBytes(2500) + .withMaxBatchSize(10) + .withExpectedElements(5) + .withKinesaliteStreamName("test-stream-name-3") + .withSinkConnectionStreamName("test-stream-name-3") + .runScenario(); + } + + @Test + public void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted() + throws Exception { + new Scenario() + .withNumberOfElementsToSend(150) + .withSizeOfMessageBytes(2500) + .withBufferMaxTimeMS(2000) + .withMaxInflightReqs(10) + .withMaxBatchSize(20) + .withExpectedElements(150) + .withKinesaliteStreamName("test-stream-name-4") + .withSinkConnectionStreamName("test-stream-name-4") + .runScenario(); + } + + @Test + public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() { + testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(true, "test-stream-name-5"); + } + + @Test + public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() { + testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(false, "test-stream-name-6"); + } + + @Test + public void veryLargeMessagesFailGracefullyWithBrokenElementConverter() { + Throwable thrown = + assertThrows( + JobExecutionException.class, + () -> + new Scenario() + .withNumberOfElementsToSend(5) + .withSizeOfMessageBytes(2500) + .withExpectedElements(5) + .withKinesaliteStreamName("test-stream-name-7") + .withSinkConnectionStreamName("test-stream-name-7") + .withElementConverter(partitionKeyTooLongElementConverter) + .runScenario()); + assertEquals( + "Encountered an exception while persisting records, not retrying due to {failOnError} being set.", + thrown.getCause().getCause().getMessage()); + } + + private class Scenario { + private int numberOfElementsToSend = 50; + private int sizeOfMessageBytes = 25; + private int bufferMaxTimeMS = 1000; + private int maxInflightReqs = 1; + private int maxBatchSize = 50; + private int expectedElements = 50; + private boolean failOnError = false; + private String kinesaliteStreamName; + private String sinkConnectionStreamName; + private ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkITCase.this.elementConverter; + + public void runScenario() throws Exception { + prepareStream(kinesaliteStreamName); + + DataStream<String> stream = + env.addSource( + new DataGeneratorSource<String>( + RandomGenerator.stringGenerator(sizeOfMessageBytes), + 100, + (long) numberOfElementsToSend)) + .returns(String.class); + + Properties prop = new Properties(); + prop.setProperty(AWS_ENDPOINT, KINESALITE.getHostEndpointUrl()); + prop.setProperty(AWS_ACCESS_KEY_ID, KINESALITE.getAccessKey()); + prop.setProperty(AWS_SECRET_ACCESS_KEY, KINESALITE.getSecretKey()); + prop.setProperty(AWS_REGION, KINESALITE.getRegion().toString()); + prop.setProperty(TRUST_ALL_CERTIFICATES, "true"); + prop.setProperty(HTTP_PROTOCOL_VERSION, "HTTP1_1"); + + KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setElementConverter(elementConverter) + .setMaxTimeInBufferMS(bufferMaxTimeMS) + .setMaxInFlightRequests(maxInflightReqs) + .setMaxBatchSize(maxBatchSize) + .setFailOnError(failOnError) + .setMaxBufferedRequests(1000) + .setStreamName(sinkConnectionStreamName) + .setKinesisClientProperties(prop) + .setFailOnError(true) + .build(); + + stream.sinkTo(kdsSink); + + env.execute("KDS Async Sink Example Program"); + + String shardIterator = + kinesisClient + .getShardIterator( + GetShardIteratorRequest.builder() + .shardId(DEFAULT_FIRST_SHARD_NAME) + .shardIteratorType(ShardIteratorType.TRIM_HORIZON) + .streamName(kinesaliteStreamName) + .build()) + .get() + .shardIterator(); + + assertEquals( Review comment: [We should now be using](https://flink.apache.org/contributing/code-style-and-quality-common.html#testing) `AssertJ`, please migrate tests, example: ``` Assertions.assertThat(kinesisClient .getRecords(GetRecordsRequest.builder().shardIterator(shardIterator).build()) .get() .records() .size()) .isEqualTo(expectedElements); ``` ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.junit.Test; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +/** Covers construction, defaults and sanity checking of KinesisDataStreamsSinkBuilder. */ +public class KinesisDataStreamsSinkBuilderTest { + private static final ElementConverter<String, PutRecordsRequestEntry> + ELEMENT_CONVERTER_PLACEHOLDER = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + + @Test + public void elementConverterOfSinkMustBeSetWhenBuilt() { + Throwable thrown = + assertThrows( + NullPointerException.class, + () -> KinesisDataStreamsSink.builder().setStreamName("stream").build()); + assertEquals( + "ElementConverter must be not null when initilizing the AsyncSinkBase.", + thrown.getMessage()); Review comment: [We should now be using](https://flink.apache.org/contributing/code-style-and-quality-common.html#testing) `AssertJ`. Can you migrate the tests to assertJ? Example: ``` Assertions.assertThatExceptionOfType(NullPointerException.class) .isThrownBy(() -> KinesisDataStreamsSink.builder().setStreamName("stream").build()) .withMessageContaining("ElementConverter must be not null when initilizing the AsyncSinkBase."); ``` ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java ########## @@ -39,8 +39,8 @@ import java.util.Map; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; -import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider; Review comment: This change is not backwards compatible ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java ########## @@ -17,24 +17,30 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; Review comment: Can we move this to the new namespace instead of legacy? ########## File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil; + +/** Defaults for {@link AWSKinesisDataStreamsUtil}. */ +@PublicEvolving +public class AWSKinesisDataStreamsConfigConstants extends AWSConfigConstants { + + public static final boolean DEFAULT_LEGACY_CONNECTOR = false; + + /** The identifier of the legacy connector. */ + public static final String LEGACY_CONNECTOR = "aws.kinesis.legacy"; +} Review comment: Should this go in the legacy connector module instead? ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java ########## @@ -47,22 +50,28 @@ public static KinesisProxyV2Interface createKinesisProxyV2(final Properties configProps) { Preconditions.checkNotNull(configProps); - final ClientConfiguration clientConfiguration = - new ClientConfigurationFactory().getConfig(); + final AttributeMap convertedProperties = AwsV2Util.convertProperties(configProps); + final AttributeMap.Builder clientConfiguration = AttributeMap.builder(); populateDefaultValues(clientConfiguration); final SdkAsyncHttpClient httpClient = - AwsV2Util.createHttpClient( - clientConfiguration, NettyNioAsyncHttpClient.builder(), configProps); + AWSGeneralUtil.createAsyncHttpClient( + convertedProperties.merge(clientConfiguration.build()), + NettyNioAsyncHttpClient.builder()); final FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(configProps, emptyList()); + + Properties legacyConfigProps = new Properties(configProps); + legacyConfigProps.setProperty( + AWSKinesisDataStreamsConfigConstants.LEGACY_CONNECTOR, Boolean.toString(true)); Review comment: Maybe instead of setting a boolean here, we can just set the user agent header? This way we can remove this logic from the new code ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ########## @@ -101,7 +101,7 @@ public static void validateConsumerConfiguration(Properties config, List<String> } if (!(config.containsKey(AWSConfigConstants.AWS_REGION) - || config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) { + || config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) { Review comment: Please make sure this change is backwards compatible, remember that customer code can be referencing `ConsumerConfigConstants.AWS_ENDPOINT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
