This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit 6484f9d1d80f61247c78c7e6ba6856820321ef02 Author: Danny Cranmer <[email protected]> AuthorDate: Fri Jun 30 11:33:53 2023 +0100 [FLINK-32208] Adding managed flink-connector-base to fix convergence issue. Remove DDB dependency on Guava --- .../reader/PollingKinesisShardSplitReader.java | 6 +- flink-connector-dynamodb/pom.xml | 6 -- .../dynamodb/sink/DynamoDbSinkWriter.java | 4 +- .../dynamodb/table/DynamoDbDynamicSinkFactory.java | 8 +- .../dynamodb/sink/DynamoDbSinkITCase.java | 7 +- .../dynamodb/sink/DynamoDbSinkWriterTest.java | 70 +++++++-------- .../dynamodb/sink/DynamoDbWriteRequestTest.java | 4 +- .../sink/DynamoDbWriterStateSerializerTest.java | 4 +- .../table/DynamoDbDynamicSinkFactoryTest.java | 4 +- .../RowDataToAttributeValueConverterTest.java | 48 +++++------ .../util/DynamoDbSerializationUtilTest.java | 26 +++--- .../dynamodb/util/PrimaryKeyBuilderTest.java | 99 +++++++++++----------- .../src/main/resources/META-INF/NOTICE | 10 +-- .../src/main/resources/META-INF/NOTICE | 1 + pom.xml | 21 +++++ tools/maven/checkstyle.xml | 2 +- 16 files changed, 165 insertions(+), 155 deletions(-) diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java index ab20e2a..354dc26 100644 --- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java +++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java @@ -27,8 +27,6 @@ import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; import org.apache.flink.connector.kinesis.source.split.StartingPosition; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; - import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.Record; @@ -41,6 +39,8 @@ import java.util.Deque; import java.util.Iterator; import java.util.Set; +import static java.util.Collections.singleton; + /** * An implementation of the SplitReader that periodically polls the Kinesis stream to retrieve * records. @@ -148,7 +148,7 @@ public class PollingKinesisShardSplitReader implements SplitReader<Record, Kines if (recordsIterator.hasNext()) { return Collections.emptySet(); } - return isComplete ? ImmutableSet.of(splitId) : Collections.emptySet(); + return isComplete ? singleton(splitId) : Collections.emptySet(); } } } diff --git a/flink-connector-dynamodb/pom.xml b/flink-connector-dynamodb/pom.xml index 40817d1..3a4e948 100644 --- a/flink-connector-dynamodb/pom.xml +++ b/flink-connector-dynamodb/pom.xml @@ -92,12 +92,6 @@ under the License. <optional>true</optional> </dependency> - <!-- Other third-party dependencies --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <!-- Test dependencies --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java index 30013b0..e0ef4c4 100644 --- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java +++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java @@ -31,7 +31,6 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.util.CollectionUtil; -import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -53,6 +52,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import static java.util.Collections.singletonMap; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; @@ -184,7 +184,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe .getClient() .batchWriteItem( BatchWriteItemRequest.builder() - .requestItems(ImmutableMap.of(tableName, items)) + .requestItems(singletonMap(tableName, items)) .build()); future.whenComplete( diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java index 41a7282..008828c 100644 --- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java +++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java @@ -25,8 +25,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.factories.FactoryUtil; -import com.google.common.collect.ImmutableSet; - import java.util.HashSet; import java.util.Set; @@ -72,6 +70,10 @@ public class DynamoDbDynamicSinkFactory extends AsyncDynamicTableSinkFactory { @Override public Set<ConfigOption<?>> requiredOptions() { - return ImmutableSet.of(TABLE_NAME, AWS_REGION); + final Set<ConfigOption<?>> requiredOptions = new HashSet<>(); + requiredOptions.add(TABLE_NAME); + requiredOptions.add(AWS_REGION); + + return requiredOptions; } } diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java index be00a3b..dafcac8 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.StringUtils; -import com.google.common.collect.ImmutableList; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,11 +44,13 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import static java.util.Collections.singletonList; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; @@ -151,7 +152,7 @@ public class DynamoDbSinkITCase { public void deduplicatesOnPartitionKey() throws Exception { new Scenario(env.fromCollection(getItemsWithDuplicatedPartitionKey())) .withTableName(testTableName) - .withOverwriteByPartitionKeys(ImmutableList.of(PARTITION_KEY)) + .withOverwriteByPartitionKeys(singletonList(PARTITION_KEY)) .withBufferMaxTimeMS(60 * 1000) .withExpectedElements(1) .withMaxInflightReqs(1) @@ -162,7 +163,7 @@ public class DynamoDbSinkITCase { public void deduplicatesOnCompositeKeyAndNewerItemTakesPrecedence() throws Exception { new Scenario(env.fromCollection(getItemsWithDuplicatedCompositeKey())) .withTableName(testTableName) - .withOverwriteByPartitionKeys(ImmutableList.of(PARTITION_KEY, SORT_KEY)) + .withOverwriteByPartitionKeys(Arrays.asList(PARTITION_KEY, SORT_KEY)) .withBufferMaxTimeMS(60 * 1000) // more than one in-flight request may cause a race condition // where the first request to complete will take precedence diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java index 60870c5..d37e184 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java @@ -22,8 +22,6 @@ import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.core.exception.SdkClientException; @@ -42,7 +40,9 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import software.amazon.awssdk.services.sts.model.StsException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -51,8 +51,10 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link DynamoDbSinkWriter}. */ @@ -67,13 +69,13 @@ public class DynamoDbSinkWriterTest { public void testSuccessfulRequestWithNoDeduplication() throws Exception { List<String> overwriteByPartitionKeys = Collections.emptyList(); List<DynamoDbWriteRequest> inputRequests = - ImmutableList.of( + Arrays.asList( sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2")), sinkDeleteRequest(item("pk", "3")), sinkDeleteRequest(item("pk", "4"))); List<WriteRequest> expectedClientRequests = - ImmutableList.of( + Arrays.asList( dynamoDbPutRequest(item("pk", "1")), dynamoDbPutRequest(item("pk", "2")), dynamoDbDeleteRequest(item("pk", "3")), @@ -95,11 +97,11 @@ public class DynamoDbSinkWriterTest { @Test public void testPutRequestPartitionKeyDeduplication() throws Exception { - List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY); + List<String> overwriteByPartitionKeys = singletonList(PARTITION_KEY); List<DynamoDbWriteRequest> inputRequests = - ImmutableList.of(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2"))); + Arrays.asList(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2"))); List<WriteRequest> expectedClientRequests = - ImmutableList.of(dynamoDbPutRequest(item("pk", "2"))); + Arrays.asList(dynamoDbPutRequest(item("pk", "2"))); TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient(); DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter = @@ -117,12 +119,12 @@ public class DynamoDbSinkWriterTest { @Test public void testDeleteRequestPartitionKeyDeduplication() throws Exception { - List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY); + List<String> overwriteByPartitionKeys = singletonList(PARTITION_KEY); List<DynamoDbWriteRequest> inputRequests = - ImmutableList.of( + Arrays.asList( sinkDeleteRequest(item("pk", "1")), sinkDeleteRequest(item("pk", "2"))); List<WriteRequest> expectedClientRequests = - ImmutableList.of(dynamoDbDeleteRequest(item("pk", "2"))); + singletonList(dynamoDbDeleteRequest(item("pk", "2"))); TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient(); DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter = @@ -140,15 +142,15 @@ public class DynamoDbSinkWriterTest { @Test public void testMultipleKeyDeduplication() throws Exception { - List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY, SORT_KEY); + List<String> overwriteByPartitionKeys = Arrays.asList(PARTITION_KEY, SORT_KEY); List<DynamoDbWriteRequest> inputRequests = - ImmutableList.of( + Arrays.asList( sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2")), sinkPutRequest(itemWithPayload("pk", "2", "string_payload_1")), sinkPutRequest(itemWithPayload("pk", "2", "string_payload_2"))); List<WriteRequest> expectedClientRequests = - ImmutableList.of( + Arrays.asList( dynamoDbPutRequest(item("pk", "1")), dynamoDbPutRequest(itemWithPayload("pk", "2", "string_payload_2"))); @@ -203,13 +205,13 @@ public class DynamoDbSinkWriterTest { public void testPartiallyFailedRequestRetriesFailedRecords() throws Exception { boolean failOnError = false; List<DynamoDbWriteRequest> inputRequests = - ImmutableList.of( + Arrays.asList( sinkPutRequest(item("put_will_fail_pk", "1")), sinkPutRequest(item("put_will_not_fail_pk", "2")), sinkDeleteRequest(item("delete_will_fail_pk", "3")), sinkDeleteRequest(item("delete_will_not_fail_pk", "4"))); List<DynamoDbWriteRequest> expectedRetriedRecords = - ImmutableList.of( + Arrays.asList( sinkPutRequest(item("put_will_fail_pk", "1")), sinkDeleteRequest(item("delete_will_fail_pk", "3"))); Predicate<String> failWhenPartitionKeyMatcher = str -> str.contains("will_fail_pk"); @@ -370,7 +372,7 @@ public class DynamoDbSinkWriterTest { } private List<DynamoDbWriteRequest> getDefaultInputRequests() { - return ImmutableList.of(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2"))); + return Arrays.asList(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2"))); } private Optional<Exception> getGenericRetryableException() { @@ -418,22 +420,22 @@ public class DynamoDbSinkWriterTest { } private Map<String, AttributeValue> item(String partitionKey, String sortKey) { - return ImmutableMap.<String, AttributeValue>builder() - .put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build()) - .put(SORT_KEY, AttributeValue.builder().n(sortKey).build()) - .put("string_payload", AttributeValue.builder().s("some_strings").build()) - .put("number_payload", AttributeValue.builder().n("1234").build()) - .build(); + final Map<String, AttributeValue> item = new HashMap<>(); + item.put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build()); + item.put(SORT_KEY, AttributeValue.builder().n(sortKey).build()); + item.put("string_payload", AttributeValue.builder().s("some_strings").build()); + item.put("number_payload", AttributeValue.builder().n("1234").build()); + return item; } private Map<String, AttributeValue> itemWithPayload( String partitionKey, String sortKey, String payload) { - return ImmutableMap.<String, AttributeValue>builder() - .put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build()) - .put(SORT_KEY, AttributeValue.builder().n(sortKey).build()) - .put("string_payload", AttributeValue.builder().s(payload).build()) - .put("number_payload", AttributeValue.builder().n("1234").build()) - .build(); + final Map<String, AttributeValue> item = new HashMap<>(); + item.put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build()); + item.put(SORT_KEY, AttributeValue.builder().n(sortKey).build()); + item.put("string_payload", AttributeValue.builder().s(payload).build()); + item.put("number_payload", AttributeValue.builder().n("1234").build()); + return item; } private static class TestAsyncDynamoDbClientProvider @@ -530,13 +532,12 @@ public class DynamoDbSinkWriterTest { .item() .get(PARTITION_KEY) .s())) - .collect(Collectors.toList()); + .collect(toList()); BatchWriteItemResponse.Builder responseBuilder = BatchWriteItemResponse.builder(); if (!failedRequests.isEmpty()) { responseBuilder = - responseBuilder.unprocessedItems( - ImmutableMap.of(TABLE_NAME, failedRequests)); + responseBuilder.unprocessedItems(singletonMap(TABLE_NAME, failedRequests)); } return CompletableFuture.completedFuture(responseBuilder.build()); } @@ -588,13 +589,12 @@ public class DynamoDbSinkWriterTest { "Write request cannot be empty"); } }) - .collect(Collectors.toList()); + .collect(toList()); BatchWriteItemResponse.Builder responseBuilder = BatchWriteItemResponse.builder(); if (!failedRequests.isEmpty()) { responseBuilder = - responseBuilder.unprocessedItems( - ImmutableMap.of(TABLE_NAME, failedRequests)); + responseBuilder.unprocessedItems(singletonMap(TABLE_NAME, failedRequests)); } return CompletableFuture.completedFuture(responseBuilder.build()); } diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java index 779d15c..1c43c3c 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.dynamodb.sink; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -28,6 +27,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; class DynamoDbWriteRequestTest { @@ -51,7 +51,7 @@ class DynamoDbWriteRequestTest { DynamoDbWriteRequest dynamoDbWriteRequest = DynamoDbWriteRequest.builder() .setItem( - ImmutableMap.of( + singletonMap( "testKey", AttributeValue.builder().s("testValue").build())) .setType(DynamoDbWriteRequestType.PUT) .build(); diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java index 8174817..958efee 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java @@ -21,13 +21,13 @@ package org.apache.flink.connector.dynamodb.sink; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import java.io.IOException; import java.nio.charset.StandardCharsets; +import static java.util.Collections.singletonMap; import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -42,7 +42,7 @@ public class DynamoDbWriterStateSerializerTest { DynamoDbWriteRequest.builder() .setType(DynamoDbWriteRequestType.PUT) .setItem( - ImmutableMap.of( + singletonMap( "key", AttributeValue.builder().s(element).build())) .build(); diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java index 9a2e276..0bbe132 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java @@ -31,7 +31,6 @@ import org.apache.flink.table.factories.TableOptionsBuilder; import org.apache.flink.table.factories.TestFormatFactory; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -40,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import static java.util.Collections.singletonMap; import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE; import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT; import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BATCH_SIZE; @@ -269,7 +269,7 @@ public class DynamoDbDynamicSinkFactoryTest { DynamoDbDynamicSink sinkWithStaticPartition = (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions); - sinkWithStaticPartition.applyStaticPartition(ImmutableMap.of("no_op_key", "no_op_value")); + sinkWithStaticPartition.applyStaticPartition(singletonMap("no_op_key", "no_op_value")); // Verify no-op for applyStaticPartition assertThat(sinkWithStaticPartition).usingRecursiveComparison().isEqualTo(originalSink); diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java index 1109118..5566ae1 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java @@ -27,7 +27,6 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -39,6 +38,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link RowDataToAttributeValueConverter}. */ @@ -56,7 +56,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().s(value).build()); + singletonMap(key, AttributeValue.builder().s(value).build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -73,7 +73,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().s(value).build()); + singletonMap(key, AttributeValue.builder().s(value).build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -90,7 +90,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createElement(StringData.fromString(value))); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().s(value).build()); + singletonMap(key, AttributeValue.builder().s(value).build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -106,7 +106,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().bool(value).build()); + singletonMap(key, AttributeValue.builder().bool(value).build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -123,7 +123,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createElement(DecimalData.fromBigDecimal(value, 5, 4))); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("1.0010").build()); + singletonMap(key, AttributeValue.builder().n("1.0010").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -139,7 +139,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("5").build()); + singletonMap(key, AttributeValue.builder().n("5").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -155,7 +155,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("256").build()); + singletonMap(key, AttributeValue.builder().n("256").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -171,7 +171,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("65536").build()); + singletonMap(key, AttributeValue.builder().n("65536").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -187,7 +187,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("4294967295").build()); + singletonMap(key, AttributeValue.builder().n("4294967295").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -203,7 +203,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("1.23456791E17").build()); + singletonMap(key, AttributeValue.builder().n("1.23456791E17").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -219,7 +219,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createElement(value)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().n("1.234567891234568E19").build()); + singletonMap(key, AttributeValue.builder().n("1.234567891234568E19").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -236,7 +236,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createElement(TimestampData.fromLocalDateTime(value))); Map<String, AttributeValue> expectedResult = - ImmutableMap.of(key, AttributeValue.builder().s("2022-11-10T00:00").build()); + singletonMap(key, AttributeValue.builder().s("2022-11-10T00:00").build()); assertThat(actualResult).containsAllEntriesOf(expectedResult); } @@ -254,7 +254,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createArray(value, StringData::fromString)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -278,7 +278,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -307,7 +307,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createArray(value, d -> DecimalData.fromBigDecimal(d, 1, 0))); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -335,7 +335,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -363,7 +363,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -390,7 +390,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -418,7 +418,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -445,7 +445,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -473,7 +473,7 @@ public class RowDataToAttributeValueConverterTest { Map<String, AttributeValue> actualResult = rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -502,7 +502,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createArray(value, TimestampData::fromLocalDateTime)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( @@ -531,7 +531,7 @@ public class RowDataToAttributeValueConverterTest { rowDataToAttributeValueConverter.convertRowData( createArray(value, TimestampData::fromInstant)); Map<String, AttributeValue> expectedResult = - ImmutableMap.of( + singletonMap( key, AttributeValue.builder() .l( diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java index b44d065..1c128fb 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java @@ -21,7 +21,6 @@ package org.apache.flink.connector.dynamodb.util; import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest; import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -67,21 +66,16 @@ public class DynamoDbSerializationUtilTest { AttributeValue.builder().s("s1").build(), AttributeValue.builder().s("s2").build()) .build()); - item.put( - "map", - AttributeValue.builder() - .m( - ImmutableMap.of( - "key1", AttributeValue.builder().s("string").build(), - "key2", AttributeValue.builder().n("12345").build(), - "binary", - AttributeValue.builder() - .b( - SdkBytes.fromByteArray( - new byte[] {1, 2, 3})) - .build(), - "null", AttributeValue.builder().nul(true).build())) - .build()); + + Map<String, AttributeValue> nestedMap = new HashMap<>(); + nestedMap.put("key1", AttributeValue.builder().s("string").build()); + nestedMap.put("key2", AttributeValue.builder().n("12345").build()); + nestedMap.put( + "binary", + AttributeValue.builder().b(SdkBytes.fromByteArray(new byte[] {1, 2, 3})).build()); + nestedMap.put("null", AttributeValue.builder().nul(true).build()); + + item.put("map", AttributeValue.builder().m(nestedMap).build()); DynamoDbWriteRequest dynamoDbWriteRequest = DynamoDbWriteRequest.builder() .setItem(item) diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java index 1bf73ba..b6fb3fa 100644 --- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java +++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java @@ -21,8 +21,6 @@ package org.apache.flink.connector.dynamodb.util; import org.apache.flink.connector.dynamodb.sink.InvalidConfigurationException; import org.apache.flink.connector.dynamodb.sink.InvalidRequestException; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -31,8 +29,13 @@ import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; import java.util.Map; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -42,18 +45,21 @@ public class PrimaryKeyBuilderTest { private static final String PARTITION_KEY_NAME = "part_key_name"; private static final String SORT_KEY_NAME = "sort_key_name"; - private ImmutableMap<String, AttributeValue> createItemValues() { - return ImmutableMap.of( + private Map<String, AttributeValue> createItemValues() { + final Map<String, AttributeValue> values = new HashMap<>(); + values.put( PARTITION_KEY_NAME, AttributeValue.builder() .s("123") .n("456") .b(SdkBytes.fromString("789", StandardCharsets.UTF_8)) - .build(), - SORT_KEY_NAME, - AttributeValue.builder().s("101112").build(), - "some_item", - AttributeValue.builder().bool(false).build()); + .build()); + + values.put(SORT_KEY_NAME, AttributeValue.builder().s("101112").build()); + + values.put("some_item", AttributeValue.builder().bool(false).build()); + + return values; } public WriteRequest createPutItemRequest(Map<String, AttributeValue> itemValues) { @@ -70,30 +76,27 @@ public class PrimaryKeyBuilderTest { @Test public void testPrimaryKeyDelimited() { - WriteRequest putRequestOne = - createPutItemRequest( - ImmutableMap.of( - PARTITION_KEY_NAME, - AttributeValue.builder().s("ab").build(), - SORT_KEY_NAME, - AttributeValue.builder().s("cd").build())); - - WriteRequest putRequestTwo = - createPutItemRequest( - ImmutableMap.of( - PARTITION_KEY_NAME, - AttributeValue.builder().s("a").build(), - SORT_KEY_NAME, - AttributeValue.builder().s("bcd").build())); + Map<String, AttributeValue> itemValuesOne = new HashMap<>(); + itemValuesOne.put(PARTITION_KEY_NAME, AttributeValue.builder().s("ab").build()); + itemValuesOne.put(SORT_KEY_NAME, AttributeValue.builder().s("cd").build()); + + WriteRequest putRequestOne = createPutItemRequest(itemValuesOne); + + Map<String, AttributeValue> itemValuesTwo = new HashMap<>(); + itemValuesTwo.put(PARTITION_KEY_NAME, AttributeValue.builder().s("a").build()); + itemValuesTwo.put(SORT_KEY_NAME, AttributeValue.builder().s("bcd").build()); + + WriteRequest putRequestTwo = createPutItemRequest(itemValuesTwo); PrimaryKeyBuilder keyBuilder = - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME, SORT_KEY_NAME)); + new PrimaryKeyBuilder(Arrays.asList(PARTITION_KEY_NAME, SORT_KEY_NAME)); + assertThat(keyBuilder.build(putRequestOne)).isNotEqualTo(keyBuilder.build(putRequestTwo)); } @Test public void testPartitionKeysOfTwoDifferentRequestsEqual() { - PrimaryKeyBuilder keyBuilder = new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)); + PrimaryKeyBuilder keyBuilder = new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)); assertThat(keyBuilder.build(createPutItemRequest(createItemValues()))) .isEqualTo(keyBuilder.build(createDeleteItemRequest(createItemValues()))); } @@ -101,7 +104,7 @@ public class PrimaryKeyBuilderTest { @Test public void testCompositeKeysOfTwoDifferentRequestsEqual() { PrimaryKeyBuilder keyBuilder = - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME, SORT_KEY_NAME)); + new PrimaryKeyBuilder(Arrays.asList(PARTITION_KEY_NAME, SORT_KEY_NAME)); assertThat(keyBuilder.build(createPutItemRequest(createItemValues()))) .isEqualTo(keyBuilder.build(createDeleteItemRequest(createItemValues()))); @@ -110,7 +113,7 @@ public class PrimaryKeyBuilderTest { @Test public void testExceptOnEmptyPartitionKeys() { assertThatExceptionOfType(InvalidConfigurationException.class) - .isThrownBy(() -> new PrimaryKeyBuilder(ImmutableList.of())) + .isThrownBy(() -> new PrimaryKeyBuilder(emptyList())) .withMessageContaining( "Unable to construct partition key as overwriteByPartitionKeys configuration not provided."); } @@ -120,7 +123,7 @@ public class PrimaryKeyBuilderTest { assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)) + new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)) .build(WriteRequest.builder().build())) .withMessageContaining("Empty write request"); } @@ -130,7 +133,7 @@ public class PrimaryKeyBuilderTest { assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)) + new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)) .build( WriteRequest.builder() .putRequest(PutRequest.builder().build()) @@ -143,7 +146,7 @@ public class PrimaryKeyBuilderTest { assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)) + new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)) .build( WriteRequest.builder() .deleteRequest( @@ -154,26 +157,26 @@ public class PrimaryKeyBuilderTest { @Test public void testExceptWhenNoPartitionKey() { - ImmutableMap<String, AttributeValue> itemValues = - ImmutableMap.of("some_item", AttributeValue.builder().bool(false).build()); + Map<String, AttributeValue> itemValues = + singletonMap("some_item", AttributeValue.builder().bool(false).build()); assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)) + new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)) .build(createPutItemRequest(itemValues))) .withMessageContaining("does not contain partition key part_key_name."); } @Test public void testExceptWhenEmptyKey() { - ImmutableMap<String, AttributeValue> itemValues = - ImmutableMap.of(PARTITION_KEY_NAME, AttributeValue.builder().s("").build()); + Map<String, AttributeValue> itemValues = + singletonMap(PARTITION_KEY_NAME, AttributeValue.builder().s("").build()); assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)) + new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)) .build(createPutItemRequest(itemValues))) .withMessageContaining( "Partition key or sort key attributes require non-empty values."); @@ -181,35 +184,29 @@ public class PrimaryKeyBuilderTest { @Test public void testExceptWhenNoPartitionKeyCompositeKey() { - ImmutableMap<String, AttributeValue> itemValues = - ImmutableMap.of( - SORT_KEY_NAME, - AttributeValue.builder().s("101112").build(), - "some_item", - AttributeValue.builder().bool(false).build()); + Map<String, AttributeValue> itemValues = new HashMap<>(); + itemValues.put(SORT_KEY_NAME, AttributeValue.builder().s("101112").build()); + itemValues.put("some_item", AttributeValue.builder().bool(false).build()); assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> - new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME)) + new PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME)) .build(createPutItemRequest(itemValues))) .withMessageContaining("does not contain partition key part_key_name."); } @Test public void testExceptWhenNoSortKey() { - ImmutableMap<String, AttributeValue> itemValues = - ImmutableMap.of( - PARTITION_KEY_NAME, - AttributeValue.builder().s("101112").build(), - "some_item", - AttributeValue.builder().bool(false).build()); + Map<String, AttributeValue> itemValues = new HashMap<>(); + itemValues.put(PARTITION_KEY_NAME, AttributeValue.builder().s("101112").build()); + itemValues.put("some_item", AttributeValue.builder().bool(false).build()); assertThatExceptionOfType(InvalidRequestException.class) .isThrownBy( () -> new PrimaryKeyBuilder( - ImmutableList.of(PARTITION_KEY_NAME, SORT_KEY_NAME)) + Arrays.asList(PARTITION_KEY_NAME, SORT_KEY_NAME)) .build(createPutItemRequest(itemValues))) .withMessageContaining("does not contain partition key sort_key_name."); } diff --git a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE index 2f5d166..af79451 100644 --- a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE +++ b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE @@ -8,7 +8,6 @@ This project bundles the following dependencies under the Apache Software Licens - software.amazon.ion:ion-java:1.0.2 - software.amazon.eventstream:eventstream:1.0.1 -- software.amazon.awssdk:arns:2.20.32 - software.amazon.awssdk:utils:2.20.32 - software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.20.32 - software.amazon.awssdk:third-party-jackson-core:2.20.32 @@ -28,6 +27,7 @@ This project bundles the following dependencies under the Apache Software Licens - software.amazon.awssdk:aws-core:2.20.32 - software.amazon.awssdk:aws-cbor-protocol:2.20.32 - software.amazon.awssdk:auth:2.20.32 +- software.amazon.awssdk:arns:2.20.32 - software.amazon.awssdk:apache-client:2.20.32 - software.amazon.awssdk:annotations:2.20.32 - org.apache.httpcomponents:httpcore:4.4.14 @@ -42,10 +42,10 @@ This project bundles the following dependencies under the Apache Software Licens - io.netty:netty-codec-http:4.1.86.Final - io.netty:netty-codec-http2:4.1.86.Final - io.netty:netty-buffer:4.1.86.Final -- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4 -- com.fasterxml.jackson.core:jackson-databind:2.13.4.2 -- com.fasterxml.jackson.core:jackson-core:2.13.4 -- com.fasterxml.jackson.core:jackson-annotations:2.13.4 +- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.14.3 +- com.fasterxml.jackson.core:jackson-databind:2.14.3 +- com.fasterxml.jackson.core:jackson-core:2.14.3 +- com.fasterxml.jackson.core:jackson-annotations:2.14.3 - com.amazonaws:jmespath-java:1.12.439 - com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3 - com.amazonaws:aws-java-sdk-sts:1.12.439 diff --git a/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE b/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE index 2fb089e..3185b03 100644 --- a/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE @@ -26,6 +26,7 @@ This project bundles the following dependencies under the Apache Software Licens - software.amazon.awssdk:aws-core:2.20.32 - software.amazon.awssdk:aws-cbor-protocol:2.20.32 - software.amazon.awssdk:auth:2.20.32 +- software.amazon.awssdk:arns:2.20.32 - software.amazon.awssdk:apache-client:2.20.32 - software.amazon.awssdk:annotations:2.20.32 - org.apache.httpcomponents:httpcore:4.4.14 diff --git a/pom.xml b/pom.xml index a1391b8..d5a03ac 100644 --- a/pom.xml +++ b/pom.xml @@ -158,6 +158,27 @@ under the License. <dependencyManagement> <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index 5b90006..29ff116 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -200,7 +200,7 @@ This file is based on the checkstyle file of Apache Beam. <!-- Checks for out of order import statements. --> <property name="severity" value="error"/> <property name="groups" - value="org.apache.flink,*,javax,java,scala"/> + value="org.apache.flink,org.apache.flink.shaded,*,javax,java,scala"/> <property name="separated" value="true"/> <property name="sortStaticImportsAlphabetically" value="true"/> <property name="option" value="bottom"/>
