This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 5b6f087 [FLINK-34260][Connectors/AWS] Update flink-connector-aws to
be compatible with updated SinkV2 interfaces
5b6f087 is described below
commit 5b6f087815bcf18cf62ba39b2ac1f84f5e72f951
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Mon Jan 29 20:17:41 2024 +0000
[FLINK-34260][Connectors/AWS] Update flink-connector-aws to be compatible
with updated SinkV2 interfaces
---
.../flink-connector-aws-kinesis-firehose/pom.xml | 6 ++
.../sink/KinesisFirehoseSinkWriterTest.java | 14 ++---
.../kinesis/sink/KinesisStreamsSinkWriterTest.java | 15 ++---
.../flink-connector-dynamodb/pom.xml | 6 ++
.../connector/dynamodb/sink/DynamoDbSink.java | 22 +++++++-
.../dynamodb/sink/DynamoDbSinkWriterTest.java | 64 ++++++++++++----------
.../flink-connector-kinesis/pom.xml | 6 ++
.../kinesis/FlinkKinesisConsumerTest.java | 2 +-
.../GlueSchemaRegistryAvroSchemaCoderTest.java | 5 +-
...eSchemaRegistryInputStreamDeserializerTest.java | 3 +-
.../flink-json-glue-schema-registry/pom.xml | 8 +++
.../GlueSchemaRegistryJsonSchemaCoderProvider.java | 8 +--
...chemaRegistryJsonDeserializationSchemaTest.java | 5 +-
pom.xml | 12 +++-
14 files changed, 116 insertions(+), 60 deletions(-)
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
index fd2e7b3..68df44e 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
@@ -71,6 +71,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 29160f7..2d9ddef 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -48,22 +48,22 @@ public class KinesisFirehoseSinkWriterTest {
.build();
@BeforeEach
- void setup() {
+ void setup() throws IOException {
TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties =
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
- sinkWriter =
- new KinesisFirehoseSinkWriter<>(
+ KinesisFirehoseSink<String> sink =
+ new KinesisFirehoseSink<>(
ELEMENT_CONVERTER_PLACEHOLDER,
- sinkInitContext,
50,
16,
10000,
- 4 * 1024 * 1024,
- 5000,
- 1000 * 1024,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
true,
"streamName",
sinkProperties);
+ sinkWriter = (KinesisFirehoseSinkWriter<String>)
sink.createWriter(sinkInitContext);
}
@Test
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
index eccfe0a..f3c13d4 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.kinesis.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
@@ -28,6 +27,7 @@ import
org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRat
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import java.io.IOException;
import java.util.Properties;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -55,13 +55,13 @@ public class KinesisStreamsSinkWriterTest {
.build();
@Test
- void
testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters()
{
- Sink.InitContext sinkInitContext = new TestSinkInitContext();
+ void
testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters()
+ throws IOException {
+ TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties =
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
- sinkWriter =
- new KinesisStreamsSinkWriter<String>(
+ KinesisStreamsSink<String> sink =
+ new KinesisStreamsSink<>(
ELEMENT_CONVERTER_PLACEHOLDER,
- sinkInitContext,
MAX_BATCH_SIZE,
MAX_INFLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
@@ -70,8 +70,9 @@ public class KinesisStreamsSinkWriterTest {
MAX_RECORD_SIZE,
FAIL_ON_ERROR,
"streamName",
- "StreamARN",
+
"arn:aws:kinesis:us-east-1:000000000000:stream/streamName",
sinkProperties);
+ sinkWriter = (KinesisStreamsSinkWriter<String>)
sink.createWriter(sinkInitContext);
assertThat(sinkWriter)
.extracting("rateLimitingStrategy")
diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml
b/flink-connector-aws/flink-connector-dynamodb/pom.xml
index c4bddcf..b4c6715 100644
--- a/flink-connector-aws/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml
@@ -87,6 +87,12 @@ under the License.
</dependency>
<!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
index c90aa19..8f64a67 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
@@ -20,12 +20,16 @@ package org.apache.flink.connector.dynamodb.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import
org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider;
+import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -80,6 +84,7 @@ public class DynamoDbSink<InputT> extends
AsyncSinkBase<InputT, DynamoDbWriteReq
private final boolean failOnError;
private final String tableName;
private final List<String> overwriteByPartitionKeys;
+ private transient SdkClientProvider<DynamoDbAsyncClient>
asyncClientSdkClientProviderOverride;
protected DynamoDbSink(
ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
@@ -152,7 +157,7 @@ public class DynamoDbSink<InputT> extends
AsyncSinkBase<InputT, DynamoDbWriteReq
failOnError,
tableName,
overwriteByPartitionKeys,
- new DynamoDbAsyncClientProvider(dynamoDbClientProperties),
+ getAsyncClientProvider(dynamoDbClientProperties),
recoveredState);
}
@@ -162,4 +167,19 @@ public class DynamoDbSink<InputT> extends
AsyncSinkBase<InputT, DynamoDbWriteReq
getWriterStateSerializer() {
return new DynamoDbWriterStateSerializer();
}
+
+ private SdkClientProvider<DynamoDbAsyncClient> getAsyncClientProvider(
+ Properties clientProperties) {
+ if (asyncClientSdkClientProviderOverride != null) {
+ return asyncClientSdkClientProviderOverride;
+ }
+ return new DynamoDbAsyncClientProvider(clientProperties);
+ }
+
+ @Internal
+ @VisibleForTesting
+ void setDynamoDbAsyncClientProvider(
+ SdkClientProvider<DynamoDbAsyncClient>
asyncClientSdkClientProviderOverride) {
+ this.asyncClientSdkClientProviderOverride =
asyncClientSdkClientProviderOverride;
+ }
}
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
index d37e184..f0d139f 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.dynamodb.sink;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
@@ -39,6 +38,7 @@ import
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.sts.model.StsException;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -46,6 +46,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -174,7 +175,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+ public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() throws
IOException {
Optional<Exception> exceptionToThrow = getGenericRetryableException();
boolean failOnError = true;
@@ -234,7 +235,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+ public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry()
throws IOException {
Optional<Exception> exceptionToThrow =
getGenericNonRetryableException();
boolean failOnError = true;
@@ -242,7 +243,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() {
+ public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry()
throws IOException {
Optional<Exception> exceptionToThrow =
getGenericNonRetryableException();
boolean failOnError = false;
@@ -250,7 +251,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testInterruptedExceptionIsNonRetryable() {
+ public void testInterruptedExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow = Optional.of(new
InterruptedException());
boolean failOnError = false;
@@ -258,7 +259,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testInvalidCredentialsExceptionIsNonRetryable() {
+ public void testInvalidCredentialsExceptionIsNonRetryable() throws
IOException {
Optional<Exception> exceptionToThrow =
Optional.of(StsException.builder().build());
boolean failOnError = false;
@@ -266,7 +267,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testResourceNotFoundExceptionIsNonRetryable() {
+ public void testResourceNotFoundExceptionIsNonRetryable() throws
IOException {
Optional<Exception> exceptionToThrow =
Optional.of(ResourceNotFoundException.builder().build());
boolean failOnError = false;
@@ -275,7 +276,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testConditionalCheckFailedExceptionIsNonRetryable() {
+ public void testConditionalCheckFailedExceptionIsNonRetryable() throws
IOException {
Optional<Exception> exceptionToThrow =
Optional.of(ConditionalCheckFailedException.builder().build());
boolean failOnError = false;
@@ -284,7 +285,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testValidationExceptionIsNonRetryable() {
+ public void testValidationExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow =
Optional.of(
DynamoDbException.builder()
@@ -299,7 +300,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testSdkClientExceptionIsNonRetryable() {
+ public void testSdkClientExceptionIsNonRetryable() throws IOException {
Optional<Exception> exceptionToThrow =
Optional.of(SdkClientException.builder().build());
boolean failOnError = false;
@@ -307,7 +308,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testGetSizeInBytesNotImplemented() {
+ public void testGetSizeInBytesNotImplemented() throws IOException {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
false, Collections.emptyList(), () -> new
TrackingDynamoDbAsyncClient());
@@ -315,7 +316,7 @@ public class DynamoDbSinkWriterTest {
}
@Test
- public void testClientClosesWhenWriterIsClosed() {
+ public void testClientClosesWhenWriterIsClosed() throws IOException {
TestAsyncDynamoDbClientProvider testAsyncDynamoDbClientProvider =
new TestAsyncDynamoDbClientProvider(new
TrackingDynamoDbAsyncClient());
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
@@ -327,7 +328,7 @@ public class DynamoDbSinkWriterTest {
}
private void assertThatRequestsAreNotRetried(
- boolean failOnError, Optional<Exception> exceptionToThrow) {
+ boolean failOnError, Optional<Exception> exceptionToThrow) throws
IOException {
ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str ->
true);
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
@@ -343,7 +344,8 @@ public class DynamoDbSinkWriterTest {
private DynamoDbSinkWriter<Map<String, AttributeValue>>
getDefaultSinkWriter(
boolean failOnError,
List<String> overwriteByPartitionKeys,
- Supplier<DynamoDbAsyncClient> clientSupplier) {
+ Supplier<DynamoDbAsyncClient> clientSupplier)
+ throws IOException {
return getDefaultSinkWriter(
failOnError,
overwriteByPartitionKeys,
@@ -353,22 +355,24 @@ public class DynamoDbSinkWriterTest {
private DynamoDbSinkWriter<Map<String, AttributeValue>>
getDefaultSinkWriter(
boolean failOnError,
List<String> overwriteByPartitionKeys,
- SdkClientProvider<DynamoDbAsyncClient>
dynamoDbAsyncClientProvider) {
- Sink.InitContext initContext = new TestSinkInitContext();
- return new DynamoDbSinkWriter(
- new TestDynamoDbElementConverter(),
- initContext,
- 2,
- 1,
- 10,
- 1024,
- 1000,
- 1024,
- failOnError,
- TABLE_NAME,
- overwriteByPartitionKeys,
- dynamoDbAsyncClientProvider,
- Collections.emptyList());
+ SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider)
+ throws IOException {
+ TestSinkInitContext initContext = new TestSinkInitContext();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ new DynamoDbSink<>(
+ new TestDynamoDbElementConverter(),
+ 2,
+ 1,
+ 10,
+ 1024,
+ 1000,
+ 1024,
+ failOnError,
+ TABLE_NAME,
+ overwriteByPartitionKeys,
+ new Properties());
+ sink.setDynamoDbAsyncClientProvider(dynamoDbAsyncClientProvider);
+ return (DynamoDbSinkWriter<Map<String, AttributeValue>>)
sink.createWriter(initContext);
}
private List<DynamoDbWriteRequest> getDefaultInputRequests() {
diff --git a/flink-connector-aws/flink-connector-kinesis/pom.xml
b/flink-connector-aws/flink-connector-kinesis/pom.xml
index b345f4a..2ffd448 100644
--- a/flink-connector-aws/flink-connector-kinesis/pom.xml
+++ b/flink-connector-aws/flink-connector-kinesis/pom.xml
@@ -137,6 +137,12 @@ under the License.
</dependency>
<!-- Flink ecosystem -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d873c4e..f8cd5ab 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -284,7 +284,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
new FlinkKinesisConsumer<>("fakeStream", new
SimpleStringSchema(), config);
FlinkKinesisConsumer<?> mockedConsumer = spy(consumer);
- RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1);
+ RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 0);
mockedConsumer.setRuntimeContext(context);
mockedConsumer.initializeState(initializationContext);
diff --git
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
index b05eaa7..fa55a4a 100644
---
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
+++
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
@@ -25,7 +25,6 @@ import
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes
import
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import
com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
-import lombok.NonNull;
import org.apache.avro.Schema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -208,9 +207,7 @@ class GlueSchemaRegistryAvroSchemaCoderTest {
@Override
public UUID getSchemaVersionIdByDefinition(
- @NonNull String schemaDefinition,
- @NonNull String schemaName,
- @NonNull String dataFormat) {
+ String schemaDefinition, String schemaName, String dataFormat)
{
EntityNotFoundException entityNotFoundException =
EntityNotFoundException.builder()
.message(AWSSchemaRegistryConstants.SCHEMA_NOT_FOUND_MSG)
diff --git
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
index 682e7bf..37bd90f 100644
---
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
+++
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
@@ -26,7 +26,6 @@ import
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes
import
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import
com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION;
-import lombok.NonNull;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
@@ -293,7 +292,7 @@ class GlueSchemaRegistryInputStreamDeserializerTest {
}
@Override
- public String getSchemaDefinition(@NonNull byte[] data) {
+ public String getSchemaDefinition(byte[] data) {
return schema.getSchemaDefinition();
}
diff --git a/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
b/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
index 0af1c75..7f34db6 100644
--- a/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
+++ b/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
@@ -61,6 +61,14 @@ under the License.
<version>${glue.schema.registry.version}</version>
</dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.30</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- ArchUit test dependencies -->
<dependency>
diff --git
a/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
b/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
index bb9902b..df45215 100644
---
a/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
+++
b/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
@@ -20,11 +20,11 @@ package org.apache.flink.formats.json.glue.schema.registry;
import org.apache.flink.annotation.PublicEvolving;
-import lombok.NonNull;
-
import java.io.Serializable;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/** Provider for {@link GlueSchemaRegistryJsonSchemaCoder}. */
@PublicEvolving
public class GlueSchemaRegistryJsonSchemaCoderProvider implements Serializable
{
@@ -40,9 +40,9 @@ public class GlueSchemaRegistryJsonSchemaCoderProvider
implements Serializable {
* @param configs configurations for AWS Glue Schema Registry
*/
public GlueSchemaRegistryJsonSchemaCoderProvider(
- String transportName, @NonNull Map<String, Object> configs) {
+ String transportName, Map<String, Object> configs) {
this.transportName = transportName;
- this.configs = configs;
+ this.configs = checkNotNull(configs);
}
public GlueSchemaRegistryJsonSchemaCoder get() {
diff --git
a/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
b/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
index 83c09eb..ec7f46e 100644
---
a/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
+++
b/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
@@ -23,7 +23,6 @@ import
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes
import
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import
com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
-import lombok.NonNull;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -160,7 +159,7 @@ class GlueSchemaRegistryJsonDeserializationSchemaTest {
}
@Override
- public Object deserialize(@NonNull AWSDeserializerInput
deserializerInput)
+ public Object deserialize(AWSDeserializerInput deserializerInput)
throws AWSSchemaRegistryException {
return userDefinedPojo;
}
@@ -174,7 +173,7 @@ class GlueSchemaRegistryJsonDeserializationSchemaTest {
}
@Override
- public Object deserialize(@NonNull AWSDeserializerInput
deserializerInput)
+ public Object deserialize(AWSDeserializerInput deserializerInput)
throws AWSSchemaRegistryException {
return userSchema;
}
diff --git a/pom.xml b/pom.xml
index f9f0ade..54ec609 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@ under the License.
<netty.version>4.1.86.Final</netty.version>
<flink.version>1.17.0</flink.version>
<jackson-bom.version>2.14.3</jackson-bom.version>
- <glue.schema.registry.version>1.1.14</glue.schema.registry.version>
+ <glue.schema.registry.version>1.1.18</glue.schema.registry.version>
<guava.version>32.1.3-jre</guava.version>
<junit5.version>5.8.1</junit5.version>
@@ -341,6 +341,16 @@ under the License.
<artifactId>amazon-kinesis-client</artifactId>
<version>1.14.8</version>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okio</groupId>
+ <artifactId>okio</artifactId>
+ <version>3.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okio</groupId>
+ <artifactId>okio-jvm</artifactId>
+ <version>3.4.0</version>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>