dannycranmer commented on a change in pull request #17345: URL: https://github.com/apache/flink/pull/17345#discussion_r741246342
########## File path: flink-connectors/flink-connector-aws/pom.xml ########## @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-aws</artifactId> + <name>Flink : Connectors : AWS</name> + <properties> + <aws.sdkv2.version>2.17.52</aws.sdkv2.version> + </properties> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <optional>true</optional> Review comment: I am not sure how it makes sense to be `provided` AND `optional`, can you explain the reasoning behind this? ########## File path: flink-connectors/flink-connector-aws/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.kinesis.sink.util.KinesisUtil; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +/** + * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More + * details on the operation of this sink writer may be found in the doc for {@link + * KinesisDataStreamsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * <p>The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { + + private static final String TOTAL_FULLY_SUCCESSFUL_FLUSHES_METRIC = + "totalFullySuccessfulFlushes"; + private static final String TOTAL_PARTIALLY_SUCCESSFUL_FLUSHES_METRIC = + "totalPartiallySuccessfulFlushes"; + private static final String TOTAL_FULLY_FAILED_FLUSHES_METRIC = "totalFullyFailedFlushes"; + private Counter totalFullySuccessfulFlushesCounter; + private Counter totalPartiallySuccessfulFlushesCounter; + private Counter totalFullyFailedFlushesCounter; + private Counter numRecordsOutErrorsCounter; + + /* Name of the stream in Kinesis Data Streams */ + private final String streamName; + + /* The sink writer metric group */ + private final SinkWriterMetricGroup metrics; + + /* The asynchronous Kinesis client - construction is by kinesisClientProperties */ + private final KinesisAsyncClient client; + + /* Flag to whether fatally fail any time we encounter an exception when persisting records */ + private final boolean failOnError; + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + + KinesisDataStreamsSinkWriter( + ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long flushOnBufferSizeInBytes, + long maxTimeInBufferMS, + boolean failOnError, + String streamName, + Properties kinesisClientProperties) { + super( + elementConverter, + context, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + flushOnBufferSizeInBytes, + maxTimeInBufferMS); + this.failOnError = failOnError; + this.streamName = streamName; + this.metrics = context.metricGroup(); + initMetricsGroup(); + this.client = buildClient(kinesisClientProperties); + } + + private KinesisAsyncClient buildClient(Properties kinesisClientProperties) { + final ClientConfiguration clientConfiguration = + new ClientConfigurationFactory().getConfig(); + clientConfiguration.setUseTcpKeepAlive(true); + + final SdkAsyncHttpClient httpClient = + KinesisUtil.createHttpClient( + clientConfiguration, + NettyNioAsyncHttpClient.builder(), + kinesisClientProperties); + + return KinesisUtil.createKinesisAsyncClient( + kinesisClientProperties, clientConfiguration, httpClient); + } + + @Override + protected void submitRequestEntries( + List<PutRecordsRequestEntry> requestEntries, + Consumer<Collection<PutRecordsRequestEntry>> requestResult, + Consumer<Exception> exceptionConsumer) { + + PutRecordsRequest batchRequest = + PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build(); + + LOG.trace("Request to submit {} entries to KDS using KDS Sink.", requestEntries.size()); + + CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); + + future.whenComplete( + (response, err) -> { + if (err != null) { + LOG.warn( + "KDS Sink failed to persist {} entries to KDS", + requestEntries.size(), + err); + totalFullyFailedFlushesCounter.inc(); + numRecordsOutErrorsCounter.inc(requestEntries.size()); + + if (isRetryable(err, exceptionConsumer)) { + requestResult.accept(requestEntries); + } + return; + } + + if (response.failedRecordCount() > 0) { + LOG.warn( + "KDS Sink failed to persist {} entries to KDS", + response.failedRecordCount()); + totalPartiallySuccessfulFlushesCounter.inc(); + numRecordsOutErrorsCounter.inc(response.failedRecordCount()); + + if (failOnError) { + exceptionConsumer.accept( + new KinesisDataStreamsException + .KinesisDataStreamsFailFastException()); + return; + } + List<PutRecordsRequestEntry> failedRequestEntries = + new ArrayList<>(response.failedRecordCount()); + List<PutRecordsResultEntry> records = response.records(); + + for (int i = 0; i < records.size(); i++) { + if (records.get(i).errorCode() != null) { + failedRequestEntries.add(requestEntries.get(i)); + } + } + + requestResult.accept(failedRequestEntries); + } else { + totalFullySuccessfulFlushesCounter.inc(); + requestResult.accept(Collections.emptyList()); + } + }); + } + + @Override + protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) { + return requestEntry.data().asByteArrayUnsafe().length; + } + + private void initMetricsGroup() { + totalFullySuccessfulFlushesCounter = metrics.counter(TOTAL_FULLY_SUCCESSFUL_FLUSHES_METRIC); + totalPartiallySuccessfulFlushesCounter = + metrics.counter(TOTAL_PARTIALLY_SUCCESSFUL_FLUSHES_METRIC); + totalFullyFailedFlushesCounter = metrics.counter(TOTAL_FULLY_FAILED_FLUSHES_METRIC); + numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); + } + + private boolean isRetryable(Throwable err, Consumer<Exception> exceptionConsumer) { + if (err instanceof CompletionException + && err.getCause() instanceof ResourceNotFoundException) { + exceptionConsumer.accept( + new KinesisDataStreamsException( + "Encountered an exception that may not be retried ", err)); Review comment: nit: "Encountered non-recoverable exception" ########## File path: flink-connectors/flink-connector-aws/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.kinesis.sink.util.KinesisUtil; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +/** + * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More + * details on the operation of this sink writer may be found in the doc for {@link + * KinesisDataStreamsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * <p>The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { + + private static final String TOTAL_FULLY_SUCCESSFUL_FLUSHES_METRIC = + "totalFullySuccessfulFlushes"; + private static final String TOTAL_PARTIALLY_SUCCESSFUL_FLUSHES_METRIC = + "totalPartiallySuccessfulFlushes"; + private static final String TOTAL_FULLY_FAILED_FLUSHES_METRIC = "totalFullyFailedFlushes"; + private Counter totalFullySuccessfulFlushesCounter; + private Counter totalPartiallySuccessfulFlushesCounter; + private Counter totalFullyFailedFlushesCounter; + private Counter numRecordsOutErrorsCounter; + + /* Name of the stream in Kinesis Data Streams */ + private final String streamName; + + /* The sink writer metric group */ + private final SinkWriterMetricGroup metrics; + + /* The asynchronous Kinesis client - construction is by kinesisClientProperties */ + private final KinesisAsyncClient client; + + /* Flag to whether fatally fail any time we encounter an exception when persisting records */ + private final boolean failOnError; + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); + + KinesisDataStreamsSinkWriter( + ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, + Sink.InitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long flushOnBufferSizeInBytes, + long maxTimeInBufferMS, + boolean failOnError, + String streamName, + Properties kinesisClientProperties) { + super( + elementConverter, + context, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + flushOnBufferSizeInBytes, + maxTimeInBufferMS); + this.failOnError = failOnError; + this.streamName = streamName; + this.metrics = context.metricGroup(); + initMetricsGroup(); + this.client = buildClient(kinesisClientProperties); + } + + private KinesisAsyncClient buildClient(Properties kinesisClientProperties) { + final ClientConfiguration clientConfiguration = + new ClientConfigurationFactory().getConfig(); + clientConfiguration.setUseTcpKeepAlive(true); + + final SdkAsyncHttpClient httpClient = + KinesisUtil.createHttpClient( + clientConfiguration, + NettyNioAsyncHttpClient.builder(), + kinesisClientProperties); + + return KinesisUtil.createKinesisAsyncClient( + kinesisClientProperties, clientConfiguration, httpClient); + } + + @Override + protected void submitRequestEntries( + List<PutRecordsRequestEntry> requestEntries, + Consumer<Collection<PutRecordsRequestEntry>> requestResult, + Consumer<Exception> exceptionConsumer) { + + PutRecordsRequest batchRequest = + PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build(); + + LOG.trace("Request to submit {} entries to KDS using KDS Sink.", requestEntries.size()); + + CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); + + future.whenComplete( Review comment: nit: this is a large block, consider extracting to a new class or method ########## File path: flink-connectors/flink-connector-aws/src/main/java/org/apache/flink/connector/kinesis/sink/util/KinesisUtil.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider; +import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.Http2Configuration; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.profiles.ProfileFile; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.utils.AttributeMap; + +import java.net.URI; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Optional; +import java.util.Properties; + +/** Utility methods specific to Amazon Web Service SDK v2.x. */ +@Internal +public class KinesisUtil { Review comment: Instead of duplicating all of this code can we extract it to a shared common module? ########## File path: flink-connectors/flink-connector-aws/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.sink; + +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.kinesis.sink.testutils.KinesaliteContainer; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; +import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.connector.kinesis.sink.util.AWSConfigConstants.AWS_ACCESS_KEY_ID; +import static org.apache.flink.connector.kinesis.sink.util.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.connector.kinesis.sink.util.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.kinesis.sink.util.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; +import static org.apache.flink.connector.kinesis.sink.util.AWSConfigConstants.TRUST_ALL_CERTIFICATES; +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** IT cases for using Kinesis Data Streams Sink based on Kinesalite. */ +public class KinesisDataStreamsSinkITCase extends TestLogger { + + private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000"; + + private final ElementConverter<String, PutRecordsRequestEntry> elementConverter = + (element, context) -> + PutRecordsRequestEntry.builder() + .data(SdkBytes.fromUtf8String(element)) + .partitionKey(String.valueOf(element.hashCode())) Review comment: I wonder if we should add an additional layer of abstraction here. Should the Kinesis Data Streams Sink accept a `SerializationSchema` and partition key extractor lambda? This implementation requires the end application to: - Serialise object to byte[] - Generate a partition key - Create a `PutRecordsRequestEntry` The responsibility of this `ElementConverter` seems bloated. Maybe we could create a boilerplate `ElementConverter` like the one below. In this case the application can use existing serialisation schema and does not care about AWS SDK objects: ``` KinesisDataStreamsSinkElementConverter.builder() .serializationSchema(new JsonSerialisationSchema()) .partitionKeyGenerator(element -> String.valueOf(element.hashCode()) .build(); ``` ########## File path: flink-connectors/flink-connector-aws/pom.xml ########## @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-aws</artifactId> + <name>Flink : Connectors : AWS</name> + <properties> + <aws.sdkv2.version>2.17.52</aws.sdkv2.version> + </properties> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> + + <!-- Amazon AWS SDK v2.x dependencies --> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>kinesis</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-core</artifactId> + <version>1.12.7</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>netty-nio-client</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>sts</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> Review comment: nit: we should group Flink dependencies together, move this beneath the Flink dependencies ########## File path: flink-connectors/flink-connector-aws/pom.xml ########## @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-aws</artifactId> + <name>Flink : Connectors : AWS</name> + <properties> + <aws.sdkv2.version>2.17.52</aws.sdkv2.version> + </properties> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> Review comment: Why is this needed since we do not support Table API for this feature, yet? ########## File path: flink-connectors/flink-connector-aws/pom.xml ########## @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-aws</artifactId> + <name>Flink : Connectors : AWS</name> + <properties> + <aws.sdkv2.version>2.17.52</aws.sdkv2.version> + </properties> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> + + <!-- Amazon AWS SDK v2.x dependencies --> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>kinesis</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-core</artifactId> + <version>1.12.7</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>netty-nio-client</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>sts</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-kinesis</artifactId> + <version>1.12.7</version> Review comment: Why not implement with SDK v2? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
