This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit 6484f9d1d80f61247c78c7e6ba6856820321ef02
Author: Danny Cranmer <[email protected]>
AuthorDate: Fri Jun 30 11:33:53 2023 +0100

    [FLINK-32208] Adding managed flink-connector-base to fix convergence issue. 
Remove DDB dependency on Guava
---
 .../reader/PollingKinesisShardSplitReader.java     |  6 +-
 flink-connector-dynamodb/pom.xml                   |  6 --
 .../dynamodb/sink/DynamoDbSinkWriter.java          |  4 +-
 .../dynamodb/table/DynamoDbDynamicSinkFactory.java |  8 +-
 .../dynamodb/sink/DynamoDbSinkITCase.java          |  7 +-
 .../dynamodb/sink/DynamoDbSinkWriterTest.java      | 70 +++++++--------
 .../dynamodb/sink/DynamoDbWriteRequestTest.java    |  4 +-
 .../sink/DynamoDbWriterStateSerializerTest.java    |  4 +-
 .../table/DynamoDbDynamicSinkFactoryTest.java      |  4 +-
 .../RowDataToAttributeValueConverterTest.java      | 48 +++++------
 .../util/DynamoDbSerializationUtilTest.java        | 26 +++---
 .../dynamodb/util/PrimaryKeyBuilderTest.java       | 99 +++++++++++-----------
 .../src/main/resources/META-INF/NOTICE             | 10 +--
 .../src/main/resources/META-INF/NOTICE             |  1 +
 pom.xml                                            | 21 +++++
 tools/maven/checkstyle.xml                         |  2 +-
 16 files changed, 165 insertions(+), 155 deletions(-)

diff --git 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
index ab20e2a..354dc26 100644
--- 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
+++ 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
 import org.apache.flink.connector.kinesis.source.split.StartingPosition;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
-
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
@@ -41,6 +39,8 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
+
 /**
  * An implementation of the SplitReader that periodically polls the Kinesis 
stream to retrieve
  * records.
@@ -148,7 +148,7 @@ public class PollingKinesisShardSplitReader implements 
SplitReader<Record, Kines
             if (recordsIterator.hasNext()) {
                 return Collections.emptySet();
             }
-            return isComplete ? ImmutableSet.of(splitId) : 
Collections.emptySet();
+            return isComplete ? singleton(splitId) : Collections.emptySet();
         }
     }
 }
diff --git a/flink-connector-dynamodb/pom.xml b/flink-connector-dynamodb/pom.xml
index 40817d1..3a4e948 100644
--- a/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-dynamodb/pom.xml
@@ -92,12 +92,6 @@ under the License.
             <optional>true</optional>
         </dependency>
 
-        <!-- Other third-party dependencies -->
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
 
b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
index 30013b0..e0ef4c4 100644
--- 
a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
+++ 
b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
@@ -31,7 +31,6 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.util.CollectionUtil;
 
-import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@@ -53,6 +52,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
+import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
 import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
@@ -184,7 +184,7 @@ class DynamoDbSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, DynamoDbWriteRe
                         .getClient()
                         .batchWriteItem(
                                 BatchWriteItemRequest.builder()
-                                        
.requestItems(ImmutableMap.of(tableName, items))
+                                        .requestItems(singletonMap(tableName, 
items))
                                         .build());
 
         future.whenComplete(
diff --git 
a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
 
b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
index 41a7282..008828c 100644
--- 
a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
+++ 
b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.factories.FactoryUtil;
 
-import com.google.common.collect.ImmutableSet;
-
 import java.util.HashSet;
 import java.util.Set;
 
@@ -72,6 +70,10 @@ public class DynamoDbDynamicSinkFactory extends 
AsyncDynamicTableSinkFactory {
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
-        return ImmutableSet.of(TABLE_NAME, AWS_REGION);
+        final Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(TABLE_NAME);
+        requiredOptions.add(AWS_REGION);
+
+        return requiredOptions;
     }
 }
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
index be00a3b..dafcac8 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.StringUtils;
 
-import com.google.common.collect.ImmutableList;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -45,11 +44,13 @@ import 
software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import static java.util.Collections.singletonList;
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
 import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
@@ -151,7 +152,7 @@ public class DynamoDbSinkITCase {
     public void deduplicatesOnPartitionKey() throws Exception {
         new Scenario(env.fromCollection(getItemsWithDuplicatedPartitionKey()))
                 .withTableName(testTableName)
-                .withOverwriteByPartitionKeys(ImmutableList.of(PARTITION_KEY))
+                .withOverwriteByPartitionKeys(singletonList(PARTITION_KEY))
                 .withBufferMaxTimeMS(60 * 1000)
                 .withExpectedElements(1)
                 .withMaxInflightReqs(1)
@@ -162,7 +163,7 @@ public class DynamoDbSinkITCase {
     public void deduplicatesOnCompositeKeyAndNewerItemTakesPrecedence() throws 
Exception {
         new Scenario(env.fromCollection(getItemsWithDuplicatedCompositeKey()))
                 .withTableName(testTableName)
-                .withOverwriteByPartitionKeys(ImmutableList.of(PARTITION_KEY, 
SORT_KEY))
+                .withOverwriteByPartitionKeys(Arrays.asList(PARTITION_KEY, 
SORT_KEY))
                 .withBufferMaxTimeMS(60 * 1000)
                 // more than one in-flight request may cause a race condition
                 // where the first request to complete will take precedence
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
index 60870c5..d37e184 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
 import software.amazon.awssdk.core.exception.SdkClientException;
@@ -42,7 +40,9 @@ import 
software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 import software.amazon.awssdk.services.sts.model.StsException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -51,8 +51,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link DynamoDbSinkWriter}. */
@@ -67,13 +69,13 @@ public class DynamoDbSinkWriterTest {
     public void testSuccessfulRequestWithNoDeduplication() throws Exception {
         List<String> overwriteByPartitionKeys = Collections.emptyList();
         List<DynamoDbWriteRequest> inputRequests =
-                ImmutableList.of(
+                Arrays.asList(
                         sinkPutRequest(item("pk", "1")),
                         sinkPutRequest(item("pk", "2")),
                         sinkDeleteRequest(item("pk", "3")),
                         sinkDeleteRequest(item("pk", "4")));
         List<WriteRequest> expectedClientRequests =
-                ImmutableList.of(
+                Arrays.asList(
                         dynamoDbPutRequest(item("pk", "1")),
                         dynamoDbPutRequest(item("pk", "2")),
                         dynamoDbDeleteRequest(item("pk", "3")),
@@ -95,11 +97,11 @@ public class DynamoDbSinkWriterTest {
 
     @Test
     public void testPutRequestPartitionKeyDeduplication() throws Exception {
-        List<String> overwriteByPartitionKeys = 
ImmutableList.of(PARTITION_KEY);
+        List<String> overwriteByPartitionKeys = singletonList(PARTITION_KEY);
         List<DynamoDbWriteRequest> inputRequests =
-                ImmutableList.of(sinkPutRequest(item("pk", "1")), 
sinkPutRequest(item("pk", "2")));
+                Arrays.asList(sinkPutRequest(item("pk", "1")), 
sinkPutRequest(item("pk", "2")));
         List<WriteRequest> expectedClientRequests =
-                ImmutableList.of(dynamoDbPutRequest(item("pk", "2")));
+                Arrays.asList(dynamoDbPutRequest(item("pk", "2")));
 
         TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new 
TrackingDynamoDbAsyncClient();
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
@@ -117,12 +119,12 @@ public class DynamoDbSinkWriterTest {
 
     @Test
     public void testDeleteRequestPartitionKeyDeduplication() throws Exception {
-        List<String> overwriteByPartitionKeys = 
ImmutableList.of(PARTITION_KEY);
+        List<String> overwriteByPartitionKeys = singletonList(PARTITION_KEY);
         List<DynamoDbWriteRequest> inputRequests =
-                ImmutableList.of(
+                Arrays.asList(
                         sinkDeleteRequest(item("pk", "1")), 
sinkDeleteRequest(item("pk", "2")));
         List<WriteRequest> expectedClientRequests =
-                ImmutableList.of(dynamoDbDeleteRequest(item("pk", "2")));
+                singletonList(dynamoDbDeleteRequest(item("pk", "2")));
 
         TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new 
TrackingDynamoDbAsyncClient();
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
@@ -140,15 +142,15 @@ public class DynamoDbSinkWriterTest {
 
     @Test
     public void testMultipleKeyDeduplication() throws Exception {
-        List<String> overwriteByPartitionKeys = 
ImmutableList.of(PARTITION_KEY, SORT_KEY);
+        List<String> overwriteByPartitionKeys = Arrays.asList(PARTITION_KEY, 
SORT_KEY);
         List<DynamoDbWriteRequest> inputRequests =
-                ImmutableList.of(
+                Arrays.asList(
                         sinkPutRequest(item("pk", "1")),
                         sinkPutRequest(item("pk", "2")),
                         sinkPutRequest(itemWithPayload("pk", "2", 
"string_payload_1")),
                         sinkPutRequest(itemWithPayload("pk", "2", 
"string_payload_2")));
         List<WriteRequest> expectedClientRequests =
-                ImmutableList.of(
+                Arrays.asList(
                         dynamoDbPutRequest(item("pk", "1")),
                         dynamoDbPutRequest(itemWithPayload("pk", "2", 
"string_payload_2")));
 
@@ -203,13 +205,13 @@ public class DynamoDbSinkWriterTest {
     public void testPartiallyFailedRequestRetriesFailedRecords() throws 
Exception {
         boolean failOnError = false;
         List<DynamoDbWriteRequest> inputRequests =
-                ImmutableList.of(
+                Arrays.asList(
                         sinkPutRequest(item("put_will_fail_pk", "1")),
                         sinkPutRequest(item("put_will_not_fail_pk", "2")),
                         sinkDeleteRequest(item("delete_will_fail_pk", "3")),
                         sinkDeleteRequest(item("delete_will_not_fail_pk", 
"4")));
         List<DynamoDbWriteRequest> expectedRetriedRecords =
-                ImmutableList.of(
+                Arrays.asList(
                         sinkPutRequest(item("put_will_fail_pk", "1")),
                         sinkDeleteRequest(item("delete_will_fail_pk", "3")));
         Predicate<String> failWhenPartitionKeyMatcher = str -> 
str.contains("will_fail_pk");
@@ -370,7 +372,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     private List<DynamoDbWriteRequest> getDefaultInputRequests() {
-        return ImmutableList.of(sinkPutRequest(item("pk", "1")), 
sinkPutRequest(item("pk", "2")));
+        return Arrays.asList(sinkPutRequest(item("pk", "1")), 
sinkPutRequest(item("pk", "2")));
     }
 
     private Optional<Exception> getGenericRetryableException() {
@@ -418,22 +420,22 @@ public class DynamoDbSinkWriterTest {
     }
 
     private Map<String, AttributeValue> item(String partitionKey, String 
sortKey) {
-        return ImmutableMap.<String, AttributeValue>builder()
-                .put(PARTITION_KEY, 
AttributeValue.builder().s(partitionKey).build())
-                .put(SORT_KEY, AttributeValue.builder().n(sortKey).build())
-                .put("string_payload", 
AttributeValue.builder().s("some_strings").build())
-                .put("number_payload", 
AttributeValue.builder().n("1234").build())
-                .build();
+        final Map<String, AttributeValue> item = new HashMap<>();
+        item.put(PARTITION_KEY, 
AttributeValue.builder().s(partitionKey).build());
+        item.put(SORT_KEY, AttributeValue.builder().n(sortKey).build());
+        item.put("string_payload", 
AttributeValue.builder().s("some_strings").build());
+        item.put("number_payload", AttributeValue.builder().n("1234").build());
+        return item;
     }
 
     private Map<String, AttributeValue> itemWithPayload(
             String partitionKey, String sortKey, String payload) {
-        return ImmutableMap.<String, AttributeValue>builder()
-                .put(PARTITION_KEY, 
AttributeValue.builder().s(partitionKey).build())
-                .put(SORT_KEY, AttributeValue.builder().n(sortKey).build())
-                .put("string_payload", 
AttributeValue.builder().s(payload).build())
-                .put("number_payload", 
AttributeValue.builder().n("1234").build())
-                .build();
+        final Map<String, AttributeValue> item = new HashMap<>();
+        item.put(PARTITION_KEY, 
AttributeValue.builder().s(partitionKey).build());
+        item.put(SORT_KEY, AttributeValue.builder().n(sortKey).build());
+        item.put("string_payload", 
AttributeValue.builder().s(payload).build());
+        item.put("number_payload", AttributeValue.builder().n("1234").build());
+        return item;
     }
 
     private static class TestAsyncDynamoDbClientProvider
@@ -530,13 +532,12 @@ public class DynamoDbSinkWriterTest {
                                                             .item()
                                                             .get(PARTITION_KEY)
                                                             .s()))
-                            .collect(Collectors.toList());
+                            .collect(toList());
 
             BatchWriteItemResponse.Builder responseBuilder = 
BatchWriteItemResponse.builder();
             if (!failedRequests.isEmpty()) {
                 responseBuilder =
-                        responseBuilder.unprocessedItems(
-                                ImmutableMap.of(TABLE_NAME, failedRequests));
+                        
responseBuilder.unprocessedItems(singletonMap(TABLE_NAME, failedRequests));
             }
             return CompletableFuture.completedFuture(responseBuilder.build());
         }
@@ -588,13 +589,12 @@ public class DynamoDbSinkWriterTest {
                                                     "Write request cannot be 
empty");
                                         }
                                     })
-                            .collect(Collectors.toList());
+                            .collect(toList());
 
             BatchWriteItemResponse.Builder responseBuilder = 
BatchWriteItemResponse.builder();
             if (!failedRequests.isEmpty()) {
                 responseBuilder =
-                        responseBuilder.unprocessedItems(
-                                ImmutableMap.of(TABLE_NAME, failedRequests));
+                        
responseBuilder.unprocessedItems(singletonMap(TABLE_NAME, failedRequests));
             }
             return CompletableFuture.completedFuture(responseBuilder.build());
         }
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
index 779d15c..1c43c3c 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.dynamodb.sink;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
@@ -28,6 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class DynamoDbWriteRequestTest {
@@ -51,7 +51,7 @@ class DynamoDbWriteRequestTest {
         DynamoDbWriteRequest dynamoDbWriteRequest =
                 DynamoDbWriteRequest.builder()
                         .setItem(
-                                ImmutableMap.of(
+                                singletonMap(
                                         "testKey", 
AttributeValue.builder().s("testValue").build()))
                         .setType(DynamoDbWriteRequestType.PUT)
                         .build();
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
index 8174817..958efee 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
@@ -21,13 +21,13 @@ package org.apache.flink.connector.dynamodb.sink;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
@@ -42,7 +42,7 @@ public class DynamoDbWriterStateSerializerTest {
                     DynamoDbWriteRequest.builder()
                             .setType(DynamoDbWriteRequestType.PUT)
                             .setItem(
-                                    ImmutableMap.of(
+                                    singletonMap(
                                             "key", 
AttributeValue.builder().s(element).build()))
                             .build();
 
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
index 9a2e276..0bbe132 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.factories.TableOptionsBuilder;
 import org.apache.flink.table.factories.TestFormatFactory;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -40,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE;
 import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT;
 import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BATCH_SIZE;
@@ -269,7 +269,7 @@ public class DynamoDbDynamicSinkFactoryTest {
 
         DynamoDbDynamicSink sinkWithStaticPartition =
                 (DynamoDbDynamicSink) createTableSink(sinkSchema, 
partitionKeys, sinkOptions);
-        
sinkWithStaticPartition.applyStaticPartition(ImmutableMap.of("no_op_key", 
"no_op_value"));
+        sinkWithStaticPartition.applyStaticPartition(singletonMap("no_op_key", 
"no_op_value"));
 
         // Verify no-op for applyStaticPartition
         
assertThat(sinkWithStaticPartition).usingRecursiveComparison().isEqualTo(originalSink);
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
index 1109118..5566ae1 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
@@ -39,6 +38,7 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link RowDataToAttributeValueConverter}. */
@@ -56,7 +56,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createElement(StringData.fromString(value)));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().s(value).build());
+                singletonMap(key, AttributeValue.builder().s(value).build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -73,7 +73,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createElement(StringData.fromString(value)));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().s(value).build());
+                singletonMap(key, AttributeValue.builder().s(value).build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -90,7 +90,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createElement(StringData.fromString(value)));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().s(value).build());
+                singletonMap(key, AttributeValue.builder().s(value).build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -106,7 +106,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().bool(value).build());
+                singletonMap(key, 
AttributeValue.builder().bool(value).build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -123,7 +123,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createElement(DecimalData.fromBigDecimal(value, 5, 
4)));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().n("1.0010").build());
+                singletonMap(key, 
AttributeValue.builder().n("1.0010").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -139,7 +139,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, AttributeValue.builder().n("5").build());
+                singletonMap(key, AttributeValue.builder().n("5").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -155,7 +155,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().n("256").build());
+                singletonMap(key, AttributeValue.builder().n("256").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -171,7 +171,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().n("65536").build());
+                singletonMap(key, AttributeValue.builder().n("65536").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -187,7 +187,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().n("4294967295").build());
+                singletonMap(key, 
AttributeValue.builder().n("4294967295").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -203,7 +203,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().n("1.23456791E17").build());
+                singletonMap(key, 
AttributeValue.builder().n("1.23456791E17").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -219,7 +219,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createElement(value));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().n("1.234567891234568E19").build());
+                singletonMap(key, 
AttributeValue.builder().n("1.234567891234568E19").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -236,7 +236,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createElement(TimestampData.fromLocalDateTime(value)));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(key, 
AttributeValue.builder().s("2022-11-10T00:00").build());
+                singletonMap(key, 
AttributeValue.builder().s("2022-11-10T00:00").build());
 
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
@@ -254,7 +254,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createArray(value, StringData::fromString));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -278,7 +278,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -307,7 +307,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createArray(value, d -> DecimalData.fromBigDecimal(d, 
1, 0)));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -335,7 +335,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -363,7 +363,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -390,7 +390,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -418,7 +418,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -445,7 +445,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -473,7 +473,7 @@ public class RowDataToAttributeValueConverterTest {
         Map<String, AttributeValue> actualResult =
                 
rowDataToAttributeValueConverter.convertRowData(createArray(value, t -> t));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -502,7 +502,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createArray(value, TimestampData::fromLocalDateTime));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
@@ -531,7 +531,7 @@ public class RowDataToAttributeValueConverterTest {
                 rowDataToAttributeValueConverter.convertRowData(
                         createArray(value, TimestampData::fromInstant));
         Map<String, AttributeValue> expectedResult =
-                ImmutableMap.of(
+                singletonMap(
                         key,
                         AttributeValue.builder()
                                 .l(
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
index b44d065..1c128fb 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.dynamodb.util;
 import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
 import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -67,21 +66,16 @@ public class DynamoDbSerializationUtilTest {
                                 AttributeValue.builder().s("s1").build(),
                                 AttributeValue.builder().s("s2").build())
                         .build());
-        item.put(
-                "map",
-                AttributeValue.builder()
-                        .m(
-                                ImmutableMap.of(
-                                        "key1", 
AttributeValue.builder().s("string").build(),
-                                        "key2", 
AttributeValue.builder().n("12345").build(),
-                                        "binary",
-                                                AttributeValue.builder()
-                                                        .b(
-                                                                
SdkBytes.fromByteArray(
-                                                                        new 
byte[] {1, 2, 3}))
-                                                        .build(),
-                                        "null", 
AttributeValue.builder().nul(true).build()))
-                        .build());
+
+        Map<String, AttributeValue> nestedMap = new HashMap<>();
+        nestedMap.put("key1", AttributeValue.builder().s("string").build());
+        nestedMap.put("key2", AttributeValue.builder().n("12345").build());
+        nestedMap.put(
+                "binary",
+                AttributeValue.builder().b(SdkBytes.fromByteArray(new byte[] 
{1, 2, 3})).build());
+        nestedMap.put("null", AttributeValue.builder().nul(true).build());
+
+        item.put("map", AttributeValue.builder().m(nestedMap).build());
         DynamoDbWriteRequest dynamoDbWriteRequest =
                 DynamoDbWriteRequest.builder()
                         .setItem(item)
diff --git 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
index 1bf73ba..b6fb3fa 100644
--- 
a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
+++ 
b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.connector.dynamodb.util;
 import org.apache.flink.connector.dynamodb.sink.InvalidConfigurationException;
 import org.apache.flink.connector.dynamodb.sink.InvalidRequestException;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -31,8 +29,13 @@ import 
software.amazon.awssdk.services.dynamodb.model.PutRequest;
 import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
@@ -42,18 +45,21 @@ public class PrimaryKeyBuilderTest {
     private static final String PARTITION_KEY_NAME = "part_key_name";
     private static final String SORT_KEY_NAME = "sort_key_name";
 
-    private ImmutableMap<String, AttributeValue> createItemValues() {
-        return ImmutableMap.of(
+    private Map<String, AttributeValue> createItemValues() {
+        final Map<String, AttributeValue> values = new HashMap<>();
+        values.put(
                 PARTITION_KEY_NAME,
                 AttributeValue.builder()
                         .s("123")
                         .n("456")
                         .b(SdkBytes.fromString("789", StandardCharsets.UTF_8))
-                        .build(),
-                SORT_KEY_NAME,
-                AttributeValue.builder().s("101112").build(),
-                "some_item",
-                AttributeValue.builder().bool(false).build());
+                        .build());
+
+        values.put(SORT_KEY_NAME, 
AttributeValue.builder().s("101112").build());
+
+        values.put("some_item", AttributeValue.builder().bool(false).build());
+
+        return values;
     }
 
     public WriteRequest createPutItemRequest(Map<String, AttributeValue> 
itemValues) {
@@ -70,30 +76,27 @@ public class PrimaryKeyBuilderTest {
 
     @Test
     public void testPrimaryKeyDelimited() {
-        WriteRequest putRequestOne =
-                createPutItemRequest(
-                        ImmutableMap.of(
-                                PARTITION_KEY_NAME,
-                                AttributeValue.builder().s("ab").build(),
-                                SORT_KEY_NAME,
-                                AttributeValue.builder().s("cd").build()));
-
-        WriteRequest putRequestTwo =
-                createPutItemRequest(
-                        ImmutableMap.of(
-                                PARTITION_KEY_NAME,
-                                AttributeValue.builder().s("a").build(),
-                                SORT_KEY_NAME,
-                                AttributeValue.builder().s("bcd").build()));
+        Map<String, AttributeValue> itemValuesOne = new HashMap<>();
+        itemValuesOne.put(PARTITION_KEY_NAME, 
AttributeValue.builder().s("ab").build());
+        itemValuesOne.put(SORT_KEY_NAME, 
AttributeValue.builder().s("cd").build());
+
+        WriteRequest putRequestOne = createPutItemRequest(itemValuesOne);
+
+        Map<String, AttributeValue> itemValuesTwo = new HashMap<>();
+        itemValuesTwo.put(PARTITION_KEY_NAME, 
AttributeValue.builder().s("a").build());
+        itemValuesTwo.put(SORT_KEY_NAME, 
AttributeValue.builder().s("bcd").build());
+
+        WriteRequest putRequestTwo = createPutItemRequest(itemValuesTwo);
 
         PrimaryKeyBuilder keyBuilder =
-                new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME, 
SORT_KEY_NAME));
+                new PrimaryKeyBuilder(Arrays.asList(PARTITION_KEY_NAME, 
SORT_KEY_NAME));
+
         
assertThat(keyBuilder.build(putRequestOne)).isNotEqualTo(keyBuilder.build(putRequestTwo));
     }
 
     @Test
     public void testPartitionKeysOfTwoDifferentRequestsEqual() {
-        PrimaryKeyBuilder keyBuilder = new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME));
+        PrimaryKeyBuilder keyBuilder = new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME));
         assertThat(keyBuilder.build(createPutItemRequest(createItemValues())))
                 
.isEqualTo(keyBuilder.build(createDeleteItemRequest(createItemValues())));
     }
@@ -101,7 +104,7 @@ public class PrimaryKeyBuilderTest {
     @Test
     public void testCompositeKeysOfTwoDifferentRequestsEqual() {
         PrimaryKeyBuilder keyBuilder =
-                new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME, 
SORT_KEY_NAME));
+                new PrimaryKeyBuilder(Arrays.asList(PARTITION_KEY_NAME, 
SORT_KEY_NAME));
 
         assertThat(keyBuilder.build(createPutItemRequest(createItemValues())))
                 
.isEqualTo(keyBuilder.build(createDeleteItemRequest(createItemValues())));
@@ -110,7 +113,7 @@ public class PrimaryKeyBuilderTest {
     @Test
     public void testExceptOnEmptyPartitionKeys() {
         assertThatExceptionOfType(InvalidConfigurationException.class)
-                .isThrownBy(() -> new PrimaryKeyBuilder(ImmutableList.of()))
+                .isThrownBy(() -> new PrimaryKeyBuilder(emptyList()))
                 .withMessageContaining(
                         "Unable to construct partition key as 
overwriteByPartitionKeys configuration not provided.");
     }
@@ -120,7 +123,7 @@ public class PrimaryKeyBuilderTest {
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
-                                new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME))
                                         .build(WriteRequest.builder().build()))
                 .withMessageContaining("Empty write request");
     }
@@ -130,7 +133,7 @@ public class PrimaryKeyBuilderTest {
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
-                                new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME))
                                         .build(
                                                 WriteRequest.builder()
                                                         
.putRequest(PutRequest.builder().build())
@@ -143,7 +146,7 @@ public class PrimaryKeyBuilderTest {
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
-                                new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME))
                                         .build(
                                                 WriteRequest.builder()
                                                         .deleteRequest(
@@ -154,26 +157,26 @@ public class PrimaryKeyBuilderTest {
 
     @Test
     public void testExceptWhenNoPartitionKey() {
-        ImmutableMap<String, AttributeValue> itemValues =
-                ImmutableMap.of("some_item", 
AttributeValue.builder().bool(false).build());
+        Map<String, AttributeValue> itemValues =
+                singletonMap("some_item", 
AttributeValue.builder().bool(false).build());
 
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
-                                new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME))
                                         
.build(createPutItemRequest(itemValues)))
                 .withMessageContaining("does not contain partition key 
part_key_name.");
     }
 
     @Test
     public void testExceptWhenEmptyKey() {
-        ImmutableMap<String, AttributeValue> itemValues =
-                ImmutableMap.of(PARTITION_KEY_NAME, 
AttributeValue.builder().s("").build());
+        Map<String, AttributeValue> itemValues =
+                singletonMap(PARTITION_KEY_NAME, 
AttributeValue.builder().s("").build());
 
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
-                                new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME))
                                         
.build(createPutItemRequest(itemValues)))
                 .withMessageContaining(
                         "Partition key or sort key attributes require 
non-empty values.");
@@ -181,35 +184,29 @@ public class PrimaryKeyBuilderTest {
 
     @Test
     public void testExceptWhenNoPartitionKeyCompositeKey() {
-        ImmutableMap<String, AttributeValue> itemValues =
-                ImmutableMap.of(
-                        SORT_KEY_NAME,
-                        AttributeValue.builder().s("101112").build(),
-                        "some_item",
-                        AttributeValue.builder().bool(false).build());
+        Map<String, AttributeValue> itemValues = new HashMap<>();
+        itemValues.put(SORT_KEY_NAME, 
AttributeValue.builder().s("101112").build());
+        itemValues.put("some_item", 
AttributeValue.builder().bool(false).build());
 
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
-                                new 
PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                new 
PrimaryKeyBuilder(singletonList(PARTITION_KEY_NAME))
                                         
.build(createPutItemRequest(itemValues)))
                 .withMessageContaining("does not contain partition key 
part_key_name.");
     }
 
     @Test
     public void testExceptWhenNoSortKey() {
-        ImmutableMap<String, AttributeValue> itemValues =
-                ImmutableMap.of(
-                        PARTITION_KEY_NAME,
-                        AttributeValue.builder().s("101112").build(),
-                        "some_item",
-                        AttributeValue.builder().bool(false).build());
+        Map<String, AttributeValue> itemValues = new HashMap<>();
+        itemValues.put(PARTITION_KEY_NAME, 
AttributeValue.builder().s("101112").build());
+        itemValues.put("some_item", 
AttributeValue.builder().bool(false).build());
 
         assertThatExceptionOfType(InvalidRequestException.class)
                 .isThrownBy(
                         () ->
                                 new PrimaryKeyBuilder(
-                                                
ImmutableList.of(PARTITION_KEY_NAME, SORT_KEY_NAME))
+                                                
Arrays.asList(PARTITION_KEY_NAME, SORT_KEY_NAME))
                                         
.build(createPutItemRequest(itemValues)))
                 .withMessageContaining("does not contain partition key 
sort_key_name.");
     }
diff --git a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE 
b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
index 2f5d166..af79451 100644
--- a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
+++ b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
@@ -8,7 +8,6 @@ This project bundles the following dependencies under the 
Apache Software Licens
 
 - software.amazon.ion:ion-java:1.0.2
 - software.amazon.eventstream:eventstream:1.0.1
-- software.amazon.awssdk:arns:2.20.32
 - software.amazon.awssdk:utils:2.20.32
 - software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.20.32
 - software.amazon.awssdk:third-party-jackson-core:2.20.32
@@ -28,6 +27,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - software.amazon.awssdk:aws-core:2.20.32
 - software.amazon.awssdk:aws-cbor-protocol:2.20.32
 - software.amazon.awssdk:auth:2.20.32
+- software.amazon.awssdk:arns:2.20.32
 - software.amazon.awssdk:apache-client:2.20.32
 - software.amazon.awssdk:annotations:2.20.32
 - org.apache.httpcomponents:httpcore:4.4.14
@@ -42,10 +42,10 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - io.netty:netty-codec-http:4.1.86.Final
 - io.netty:netty-codec-http2:4.1.86.Final
 - io.netty:netty-buffer:4.1.86.Final
-- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4
-- com.fasterxml.jackson.core:jackson-databind:2.13.4.2
-- com.fasterxml.jackson.core:jackson-core:2.13.4
-- com.fasterxml.jackson.core:jackson-annotations:2.13.4
+- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.14.3
+- com.fasterxml.jackson.core:jackson-databind:2.14.3
+- com.fasterxml.jackson.core:jackson-core:2.14.3
+- com.fasterxml.jackson.core:jackson-annotations:2.14.3
 - com.amazonaws:jmespath-java:1.12.439
 - com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3
 - com.amazonaws:aws-java-sdk-sts:1.12.439
diff --git 
a/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE
index 2fb089e..3185b03 100644
--- a/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE
@@ -26,6 +26,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - software.amazon.awssdk:aws-core:2.20.32
 - software.amazon.awssdk:aws-cbor-protocol:2.20.32
 - software.amazon.awssdk:auth:2.20.32
+- software.amazon.awssdk:arns:2.20.32
 - software.amazon.awssdk:apache-client:2.20.32
 - software.amazon.awssdk:annotations:2.20.32
 - org.apache.httpcomponents:httpcore:4.4.14
diff --git a/pom.xml b/pom.xml
index a1391b8..d5a03ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,27 @@ under the License.
 
     <dependencyManagement>
         <dependencies>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-connector-aws-base</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-connector-base</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-common</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-avro</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.amazonaws</groupId>
                 <artifactId>aws-java-sdk-bom</artifactId>
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index 5b90006..29ff116 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -200,7 +200,7 @@ This file is based on the checkstyle file of Apache Beam.
                        <!-- Checks for out of order import statements. -->
                        <property name="severity" value="error"/>
                        <property name="groups"
-                                         
value="org.apache.flink,*,javax,java,scala"/>
+                                         
value="org.apache.flink,org.apache.flink.shaded,*,javax,java,scala"/>
                        <property name="separated" value="true"/>
                        <property name="sortStaticImportsAlphabetically" 
value="true"/>
                        <property name="option" value="bottom"/>

Reply via email to