This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new a62e3d8 [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of
Stream Name for Kinesis API calls
a62e3d8 is described below
commit a62e3d866338f71345a92ac9d7e056eba603a763
Author: Hong Liang Teoh <[email protected]>
AuthorDate: Wed Apr 26 18:29:18 2023 +0100
[FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Name for
Kinesis API calls
---
flink-connector-aws-kinesis-streams/pom.xml | 5 +++
.../connector/kinesis/sink/KinesisStreamsSink.java | 29 +++++++++++----
.../kinesis/sink/KinesisStreamsSinkBuilder.java | 17 ++++++---
.../kinesis/sink/KinesisStreamsSinkWriter.java | 15 +++++++-
.../sink/KinesisStreamsSinkBuilderTest.java | 29 ++++++++++++++-
.../kinesis/sink/KinesisStreamsSinkITCase.java | 41 +++++++++++++---------
flink-connector-kinesis/pom.xml | 4 +++
.../connectors/kinesis/proxy/KinesisProxy.java | 31 +++++++++++++---
.../src/main/resources/META-INF/NOTICE | 17 ++++-----
.../kinesis/proxy/DynamoDBStreamsProxyTest.java | 4 ++-
.../connectors/kinesis/proxy/KinesisProxyTest.java | 39 ++++++++++++++++++--
.../testutils/FakeKinesisClientFactory.java | 32 ++++++++++++-----
pom.xml | 2 +-
13 files changed, 210 insertions(+), 55 deletions(-)
diff --git a/flink-connector-aws-kinesis-streams/pom.xml
b/flink-connector-aws-kinesis-streams/pom.xml
index 6c05ff1..eea58f5 100644
--- a/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws-kinesis-streams/pom.xml
@@ -58,6 +58,11 @@ under the License.
<artifactId>kinesis</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>arns</artifactId>
+ </dependency>
+
<!--Table API dependencies-->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
index b6cef5a..5b46196 100644
---
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
+++
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
+import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import java.io.IOException;
@@ -68,6 +69,7 @@ public class KinesisStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRecords
private final boolean failOnError;
private final String streamName;
+ private final String streamArn;
private final Properties kinesisClientProperties;
KinesisStreamsSink(
@@ -80,6 +82,7 @@ public class KinesisStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRecords
Long maxRecordSizeInBytes,
boolean failOnError,
String streamName,
+ String streamArn,
Properties kinesisClientProperties) {
super(
elementConverter,
@@ -89,13 +92,23 @@ public class KinesisStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRecords
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes);
- this.streamName =
- Preconditions.checkNotNull(
- streamName,
- "The stream name must not be null when initializing
the KDS Sink.");
- Preconditions.checkArgument(
- !this.streamName.isEmpty(),
- "The stream name must be set when initializing the KDS Sink.");
+
+ // Ensure that either streamName or streamArn is set. If both are set,
streamArn takes
+ // precedence.
+ if (streamArn != null) {
+ final Arn arn = Arn.fromString(streamArn);
+ this.streamArn = arn.toString();
+ this.streamName = arn.resource().resource();
+ } else {
+ this.streamArn = null;
+ this.streamName =
+ Preconditions.checkNotNull(
+ streamName,
+ "The stream name must not be null when
initializing the KDS Sink.");
+ Preconditions.checkArgument(
+ !this.streamName.isEmpty(),
+ "The stream name must be set when initializing the KDS
Sink.");
+ }
this.failOnError = failOnError;
this.kinesisClientProperties = kinesisClientProperties;
}
@@ -126,6 +139,7 @@ public class KinesisStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRecords
getMaxRecordSizeInBytes(),
failOnError,
streamName,
+ streamArn,
kinesisClientProperties,
Collections.emptyList());
}
@@ -154,6 +168,7 @@ public class KinesisStreamsSink<InputT> extends
AsyncSinkBase<InputT, PutRecords
getMaxRecordSizeInBytes(),
failOnError,
streamName,
+ streamArn,
kinesisClientProperties,
recoveredState);
}
diff --git
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
index 3e6f7ec..b718a78 100644
---
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
+++
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
@@ -30,13 +30,13 @@ import java.util.Properties;
* Builder to construct {@link KinesisStreamsSink}.
*
* <p>The following example shows the minimum setup to create a {@link
KinesisStreamsSink} that
- * writes String values to a Kinesis Data Streams stream named
your_stream_here.
+ * writes String values to a Kinesis Data Streams stream.
*
* <pre>{@code
* KinesisStreamsSink<String> kdsSink =
* KinesisStreamsSink.<String>builder()
* .setElementConverter(elementConverter)
- * .setStreamName("your_stream_name")
+ * .setStreamArn("your_stream_arn")
* .setSerializationSchema(new SimpleStringSchema())
* .setPartitionKeyGenerator(element ->
String.valueOf(element.hashCode()))
* .build();
@@ -71,6 +71,7 @@ public class KinesisStreamsSinkBuilder<InputT>
private Boolean failOnError;
private String streamName;
+ private String streamArn;
private Properties kinesisClientProperties;
private SerializationSchema<InputT> serializationSchema;
private PartitionKeyGenerator<InputT> partitionKeyGenerator;
@@ -78,18 +79,23 @@ public class KinesisStreamsSinkBuilder<InputT>
KinesisStreamsSinkBuilder() {}
/**
- * Sets the name of the KDS stream that the sink will connect to. There is
no default for this
- * parameter, therefore, this must be provided at sink creation time
otherwise the build will
- * fail.
+ * Providing the stream name is deprecated. Please provide stream ARN
using {@link
+ * KinesisStreamsSinkBuilder#setStreamArn} instead.
*
* @param streamName the name of the stream
* @return {@link KinesisStreamsSinkBuilder} itself
*/
+ @Deprecated
public KinesisStreamsSinkBuilder<InputT> setStreamName(String streamName) {
this.streamName = streamName;
return this;
}
+ public KinesisStreamsSinkBuilder<InputT> setStreamArn(String streamArn) {
+ this.streamArn = streamArn;
+ return this;
+ }
+
public KinesisStreamsSinkBuilder<InputT> setSerializationSchema(
SerializationSchema<InputT> serializationSchema) {
this.serializationSchema = serializationSchema;
@@ -129,6 +135,7 @@ public class KinesisStreamsSinkBuilder<InputT>
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
streamName,
+ streamArn,
Optional.ofNullable(kinesisClientProperties).orElse(new
Properties()));
}
}
diff --git
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index c1bf1ff..2ff49cb 100644
---
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.kinesis.sink;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
@@ -59,6 +60,7 @@ import static
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptio
* 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID}
and {@code
* AWS_SECRET_ACCESS_KEY} through environment variables etc.
*/
+@Internal
class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT,
PutRecordsRequestEntry> {
private static final Logger LOG =
LoggerFactory.getLogger(KinesisStreamsSinkWriter.class);
@@ -82,6 +84,9 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
/* Name of the stream in Kinesis Data Streams */
private final String streamName;
+ /* ARN of the stream in Kinesis Data Streams */
+ private final String streamArn;
+
/* The sink writer metric group */
private final SinkWriterMetricGroup metrics;
@@ -105,6 +110,7 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
long maxRecordSizeInBytes,
boolean failOnError,
String streamName,
+ String streamArn,
Properties kinesisClientProperties) {
this(
elementConverter,
@@ -117,6 +123,7 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
maxRecordSizeInBytes,
failOnError,
streamName,
+ streamArn,
kinesisClientProperties,
Collections.emptyList());
}
@@ -132,6 +139,7 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
long maxRecordSizeInBytes,
boolean failOnError,
String streamName,
+ String streamArn,
Properties kinesisClientProperties,
Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
super(
@@ -148,6 +156,7 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
states);
this.failOnError = failOnError;
this.streamName = streamName;
+ this.streamArn = streamArn;
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter =
metrics.getNumRecordsOutErrorsCounter();
this.httpClient =
AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
@@ -172,7 +181,11 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
Consumer<List<PutRecordsRequestEntry>> requestResult) {
PutRecordsRequest batchRequest =
-
PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
+ PutRecordsRequest.builder()
+ .records(requestEntries)
+ .streamName(streamName)
+ .streamARN(streamArn)
+ .build();
CompletableFuture<PutRecordsResponse> future =
kinesisClient.putRecords(batchRequest);
diff --git
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
index ce88c63..539decc 100644
---
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
+++
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
@@ -39,7 +39,7 @@ class KinesisStreamsSinkBuilderTest {
}
@Test
- void streamNameOfSinkMustBeSetWhenBuilt() {
+ void streamNameOrStreamArnMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -65,6 +65,33 @@ class KinesisStreamsSinkBuilderTest {
"The stream name must be set when initializing the KDS
Sink.");
}
+ @Test
+ void streamArnMustNotBeEmptyWhenBuilt() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ KinesisStreamsSink.<String>builder()
+
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+
.setSerializationSchema(SERIALIZATION_SCHEMA)
+ .setStreamArn("")
+ .build())
+ .withMessageContaining("Malformed ARN - doesn't start with
'arn:'");
+ }
+
+ @Test
+ void streamArnResouceMustNotBeEmptyWhenBuilt() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ KinesisStreamsSink.<String>builder()
+
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+
.setSerializationSchema(SERIALIZATION_SCHEMA)
+ .setStreamArn(
+
"arn:aws:kinesis:us-east-1:000000000000:stream/")
+ .build())
+ .withMessageContaining("resource must not be blank or empty.");
+ }
+
@Test
void serializationSchemaMustBeSetWhenSinkIsBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
diff --git
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
index 2360558..cee11e5 100644
---
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
+++
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
@@ -173,7 +173,8 @@ class KinesisStreamsSinkITCase {
.runScenario())
.havingCause()
.havingCause()
- .withMessageContaining("Encountered non-recoverable
exception");
+ .havingCause()
+ .withMessageContaining("Stream non-existent-stream under
account");
}
@Test
@@ -343,18 +344,21 @@ class KinesisStreamsSinkITCase {
false, "WRONG", "Invalid AWS Credential Provider Type");
}
- private void credentialsProvidedThroughProfilePathShouldResultInFailure(
- boolean failOnError,
- String credentialsProvider,
- String credentialsProfileLocation,
- String expectedMessage) {
- Properties properties =
-
getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(credentialsProvider);
- properties.put(
- AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
- credentialsProfileLocation);
- assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
- failOnError, properties, expectedMessage);
+ @Test
+ void streamArnShouldTakePrecedenceOverStreamName() {
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+
.withKinesaliteStreamName("stream-exists")
+ .withSinkConnectionStreamArn(
+
"arn:aws:kinesis:us-east-1:000000000000:stream/stream-not-exists")
+
.withSinkConnectionStreamName("stream-exists")
+ .withFailOnError(true)
+ .withProperties(getDefaultProperties())
+ .runScenario())
+ .withStackTraceContaining(
+ "Stream stream-not-exists under account 000000000000
not found");
}
private void
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
@@ -375,9 +379,7 @@ class KinesisStreamsSinkITCase {
.withFailOnError(failOnError)
.withProperties(properties)
.runScenario())
- .havingCause()
- .havingCause()
- .withMessageContaining(expectedMessage);
+ .withStackTraceContaining(expectedMessage);
}
private Properties
getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(
@@ -408,6 +410,7 @@ class KinesisStreamsSinkITCase {
private boolean failOnError = false;
private String kinesaliteStreamName = null;
private String sinkConnectionStreamName;
+ private String sinkConnectionStreamArn;
private SerializationSchema<String> serializationSchema =
KinesisStreamsSinkITCase.this.serializationSchema;
private PartitionKeyGenerator<String> partitionKeyGenerator =
@@ -440,6 +443,7 @@ class KinesisStreamsSinkITCase {
.setFailOnError(failOnError)
.setMaxBufferedRequests(1000)
.setStreamName(sinkConnectionStreamName)
+ .setStreamArn(sinkConnectionStreamArn)
.setKinesisClientProperties(properties)
.setFailOnError(true)
.build();
@@ -509,6 +513,11 @@ class KinesisStreamsSinkITCase {
return this;
}
+ public Scenario withSinkConnectionStreamArn(String
sinkConnectionStreamArn) {
+ this.sinkConnectionStreamArn = sinkConnectionStreamArn;
+ return this;
+ }
+
public Scenario withKinesaliteStreamName(String kinesaliteStreamName) {
this.kinesaliteStreamName = kinesaliteStreamName;
return this;
diff --git a/flink-connector-kinesis/pom.xml b/flink-connector-kinesis/pom.xml
index 1194941..e2b0f78 100644
--- a/flink-connector-kinesis/pom.xml
+++ b/flink-connector-kinesis/pom.xml
@@ -74,6 +74,10 @@ under the License.
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-core</artifactId>
+ </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
diff --git
a/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
b/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index bfa926e..b5b2b37 100644
---
a/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++
b/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -58,6 +58,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,6 +141,9 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the describe stream operation.
*/
private final double describeStreamExpConstant;
+ /** Caches retrieved stream ARNs for give stream names. */
+ private final Map<String, String> streamNameToArnLookup = new
ConcurrentHashMap<>();
+
/**
* Create a new KinesisProxy based on the supplied configuration
properties.
*
@@ -350,6 +354,7 @@ public class KinesisProxy implements KinesisProxyInterface {
GetShardIteratorRequest getShardIteratorRequest =
new GetShardIteratorRequest()
.withStreamName(shard.getStreamName())
+ .withStreamARN(lookupStreamArn(shard.getStreamName()))
.withShardId(shard.getShard().getShardId())
.withShardIteratorType(shardIteratorType);
@@ -464,13 +469,15 @@ public class KinesisProxy implements
KinesisProxyInterface {
String streamName, @Nullable String lastSeenShardId) throws
InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();
+ String streamArn = lookupStreamArn(streamName);
+
// List Shards returns just the first 1000 shard entries. In order to
read the entire
// stream,
// we need to use the returned nextToken to get additional shards.
ListShardsResult listShardsResult;
String startShardToken = null;
do {
- listShardsResult = listShards(streamName, lastSeenShardId,
startShardToken);
+ listShardsResult = listShards(streamName, streamArn,
lastSeenShardId, startShardToken);
if (listShardsResult == null) {
// In case we have exceptions while retrieving all shards,
ensure that incomplete
// shard list is not returned.
@@ -488,6 +495,15 @@ public class KinesisProxy implements KinesisProxyInterface
{
return shardsOfStream;
}
+ private synchronized String lookupStreamArn(String streamName) throws
InterruptedException {
+ if (streamNameToArnLookup.containsKey(streamName)) {
+ return streamNameToArnLookup.get(streamName);
+ }
+ String streamArn = describeStream(streamName,
null).getStreamDescription().getStreamARN();
+ streamNameToArnLookup.put(streamName, streamArn);
+ return streamArn;
+ }
+
/**
* Get metainfo for a Kinesis stream, which contains information about
which shards this Kinesis
* stream possess.
@@ -498,18 +514,23 @@ public class KinesisProxy implements
KinesisProxyInterface {
* subtask's fetcher. This jitter backoff approach will help distribute
calls across the
* fetchers over time.
*
- * @param streamName the stream to describe
+ * @param streamName the stream name
+ * @param streamArn the stream ARN
* @param startShardId which shard to start with for this describe
operation (earlier shard's
* infos will not appear in result)
* @return the result of the describe stream operation
*/
private ListShardsResult listShards(
- String streamName, @Nullable String startShardId, @Nullable String
startNextToken)
+ String streamName,
+ String streamArn,
+ @Nullable String startShardId,
+ @Nullable String startNextToken)
throws InterruptedException {
final ListShardsRequest listShardsRequest = new ListShardsRequest();
if (startNextToken == null) {
listShardsRequest.setExclusiveStartShardId(startShardId);
listShardsRequest.setStreamName(streamName);
+ listShardsRequest.setStreamARN(streamArn);
} else {
// Note the nextToken returned by AWS expires within 300 sec.
listShardsRequest.setNextToken(startNextToken);
@@ -535,7 +556,7 @@ public class KinesisProxy implements KinesisProxyInterface {
retryCount++);
LOG.warn(
"Got LimitExceededException when listing shards from
stream "
- + streamName
+ + streamArn
+ ". Backing off for "
+ backoffMillis
+ " millis.");
@@ -567,7 +588,7 @@ public class KinesisProxy implements KinesisProxyInterface {
retryCount++);
LOG.warn(
"Got SdkClientException when listing shards from
stream {}. Backing off for {} millis.",
- streamName,
+ streamArn,
backoffMillis);
BACKOFF.sleep(backoffMillis);
} else {
diff --git a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
index acc0646..2f5d166 100644
--- a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
+++ b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
@@ -8,6 +8,7 @@ This project bundles the following dependencies under the
Apache Software Licens
- software.amazon.ion:ion-java:1.0.2
- software.amazon.eventstream:eventstream:1.0.1
+- software.amazon.awssdk:arns:2.20.32
- software.amazon.awssdk:utils:2.20.32
- software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.20.32
- software.amazon.awssdk:third-party-jackson-core:2.20.32
@@ -45,15 +46,15 @@ This project bundles the following dependencies under the
Apache Software Licens
- com.fasterxml.jackson.core:jackson-databind:2.13.4.2
- com.fasterxml.jackson.core:jackson-core:2.13.4
- com.fasterxml.jackson.core:jackson-annotations:2.13.4
-- com.amazonaws:jmespath-java:1.12.276
+- com.amazonaws:jmespath-java:1.12.439
- com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3
-- com.amazonaws:aws-java-sdk-sts:1.12.276
-- com.amazonaws:aws-java-sdk-s3:1.12.276
-- com.amazonaws:aws-java-sdk-kms:1.12.276
-- com.amazonaws:aws-java-sdk-kinesis:1.12.276
-- com.amazonaws:aws-java-sdk-dynamodb:1.12.276
-- com.amazonaws:aws-java-sdk-core:1.12.276
-- com.amazonaws:aws-java-sdk-cloudwatch:1.12.276
+- com.amazonaws:aws-java-sdk-sts:1.12.439
+- com.amazonaws:aws-java-sdk-s3:1.12.439
+- com.amazonaws:aws-java-sdk-kms:1.12.439
+- com.amazonaws:aws-java-sdk-kinesis:1.12.439
+- com.amazonaws:aws-java-sdk-dynamodb:1.12.439
+- com.amazonaws:aws-java-sdk-core:1.12.439
+- com.amazonaws:aws-java-sdk-cloudwatch:1.12.439
- com.amazonaws:amazon-kinesis-producer:0.14.1
- com.amazonaws:amazon-kinesis-client:1.14.8
diff --git
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
index 261e69d..8d42962 100644
---
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
+++
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
@@ -35,6 +35,8 @@ import static org.assertj.core.api.Assertions.assertThat;
class DynamoDBStreamsProxyTest {
private static final String FAKE_STREAM_NAME = "fake-stream";
+ private static final String FAKE_STREAM_ARN =
+ "arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
private static final List<String> SHARD_IDS =
Arrays.asList(
@@ -76,7 +78,7 @@ class DynamoDBStreamsProxyTest {
protected AmazonKinesis createKinesisClient(Properties configProps) {
return
FakeKinesisClientFactory.resourceNotFoundWhenGettingShardIterator(
- FAKE_STREAM_NAME, SHARD_IDS);
+ FAKE_STREAM_NAME, FAKE_STREAM_ARN, SHARD_IDS);
}
}
}
diff --git
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index a3e9756..60bf2d2 100644
---
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -31,12 +31,16 @@ import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.AmazonKinesisException;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import com.amazonaws.services.kinesis.model.StreamStatus;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.http.HttpHost;
import org.apache.http.conn.ConnectTimeoutException;
@@ -210,6 +214,7 @@ public class KinesisProxyTest {
"shardId-000000000003");
String nextToken = "NextToken";
String fakeStreamName = "fake-stream";
+ String fakeStreamArn =
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
List<Shard> shards =
shardIds.stream()
.map(shardId -> new Shard().withShardId(shardId))
@@ -217,6 +222,8 @@ public class KinesisProxyTest {
AmazonKinesis mockClient = mock(AmazonKinesis.class);
KinesisProxy kinesisProxy = getProxy(mockClient);
+
when(mockClient.describeStream(getDescribeStreamRequest(fakeStreamName)))
+ .thenReturn(getDescribeStreamResult(fakeStreamArn));
ListShardsResult responseWithMoreData =
new ListShardsResult().withShards(shards.subList(0,
2)).withNextToken(nextToken);
ListShardsResult responseFinal =
@@ -266,6 +273,7 @@ public class KinesisProxyTest {
KinesisShardIdGenerator.generateFromShardOrder(0),
KinesisShardIdGenerator.generateFromShardOrder(1));
String fakeStreamName = "fake-stream";
+ String fakeStreamArn =
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
List<Shard> shards =
shardIds.stream()
.map(shardId -> new Shard().withShardId(shardId))
@@ -274,6 +282,8 @@ public class KinesisProxyTest {
AmazonKinesis mockClient = mock(AmazonKinesis.class);
KinesisProxy kinesisProxy = getProxy(mockClient);
+
when(mockClient.describeStream(getDescribeStreamRequest(fakeStreamName)))
+ .thenReturn(getDescribeStreamResult(fakeStreamArn));
ListShardsResult responseFirst =
new ListShardsResult().withShards(shards).withNextToken(null);
doReturn(responseFirst)
@@ -356,13 +366,17 @@ public class KinesisProxyTest {
public void testGetShardWithNoNewShards() throws Exception {
// given
String fakeStreamName = "fake-stream";
+ String fakeStreamArn =
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
AmazonKinesis mockClient = mock(AmazonKinesis.class);
KinesisProxy kinesisProxy = getProxy(mockClient);
+
when(mockClient.describeStream(getDescribeStreamRequest(fakeStreamName)))
+ .thenReturn(getDescribeStreamResult(fakeStreamArn));
when(mockClient.listShards(
new ListShardsRequest()
.withStreamName(fakeStreamName)
+ .withStreamARN(fakeStreamArn)
.withExclusiveStartShardId(
KinesisShardIdGenerator.generateFromShardOrder(1))))
.thenReturn(new
ListShardsResult().withShards(Collections.emptyList()));
@@ -379,6 +393,9 @@ public class KinesisProxyTest {
@Test
public void testGetShardListRetry() throws Exception {
+ final String streamName = "fake-stream";
+ final String streamArn =
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
+
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
@@ -394,6 +411,8 @@ public class KinesisProxyTest {
};
AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
+ when(mockClient.describeStream(getDescribeStreamRequest(streamName)))
+ .thenReturn(getDescribeStreamResult(streamArn));
when(mockClient.listShards(any()))
.thenAnswer(
new Answer<ListShardsResult>() {
@@ -413,11 +432,11 @@ public class KinesisProxyTest {
Whitebox.getField(KinesisProxy.class,
"kinesisClient").set(kinesisProxy, mockClient);
HashMap<String, String> streamNames = new HashMap();
- streamNames.put("fake-stream", null);
+ streamNames.put(streamName, null);
GetShardListResult result = kinesisProxy.getShardList(streamNames);
assertThat(exceptionCount.intValue()).isEqualTo(retriableExceptions.length);
assertThat(result.hasRetrievedShards()).isTrue();
-
assertThat(result.getLastSeenShardOfStream("fake-stream").getShard().getShardId())
+
assertThat(result.getLastSeenShardOfStream(streamName).getShard().getShardId())
.isEqualTo(shard.getShardId());
// test max attempt count exceeded
@@ -545,4 +564,20 @@ public class KinesisProxyTest {
return kinesisProxy;
}
+
+ private DescribeStreamRequest getDescribeStreamRequest(final String
streamName) {
+ DescribeStreamRequest describeStreamRequest = new
DescribeStreamRequest();
+ describeStreamRequest.setStreamName(streamName);
+ return describeStreamRequest;
+ }
+
+ private DescribeStreamResult getDescribeStreamResult(final String
streamArn) {
+ StreamDescription streamDescription = new StreamDescription();
+ streamDescription.setStreamARN(streamArn);
+ streamDescription.setStreamStatus(StreamStatus.ACTIVE);
+
+ DescribeStreamResult describeStreamResult = new DescribeStreamResult();
+ describeStreamResult.setStreamDescription(streamDescription);
+ return describeStreamResult;
+ }
}
diff --git
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
index a9f72a0..155c051 100644
---
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
+++
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
@@ -77,6 +77,8 @@ import
com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest;
import com.amazonaws.services.kinesis.model.StartStreamEncryptionResult;
import com.amazonaws.services.kinesis.model.StopStreamEncryptionRequest;
import com.amazonaws.services.kinesis.model.StopStreamEncryptionResult;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
import com.amazonaws.services.kinesis.model.UpdateStreamModeRequest;
@@ -93,13 +95,33 @@ import java.util.List;
public class FakeKinesisClientFactory {
public static AmazonKinesis resourceNotFoundWhenGettingShardIterator(
- String streamName, List<String> shardIds) {
+ String streamName, String streamArn, List<String> shardIds) {
return new AmazonKinesis() {
+ @Override
+ public DescribeStreamResult describeStream(
+ DescribeStreamRequest describeStreamRequest) {
+ if (streamName.equals(describeStreamRequest.getStreamName())) {
+ return new DescribeStreamResult()
+ .withStreamDescription(
+ new StreamDescription()
+ .withStreamARN(streamArn)
+
.withStreamStatus(StreamStatus.ACTIVE));
+ }
+
+ final ResourceNotFoundException ex =
+ new ResourceNotFoundException(
+ "Requested resource not found: Stream does not
exist: "
+ +
describeStreamRequest.getStreamName());
+ ex.setErrorType(AmazonServiceException.ErrorType.Client);
+
+ throw ex;
+ }
+
@Override
public GetShardIteratorResult getShardIterator(
GetShardIteratorRequest getShardIteratorRequest) {
- if (getShardIteratorRequest.getStreamName().equals(streamName)
+ if (streamArn.equals(getShardIteratorRequest.getStreamARN())
&&
shardIds.contains(getShardIteratorRequest.getShardId())) {
return new
GetShardIteratorResult().withShardIterator("fakeShardIterator");
}
@@ -162,12 +184,6 @@ public class FakeKinesisClientFactory {
return null;
}
- @Override
- public DescribeStreamResult describeStream(
- DescribeStreamRequest describeStreamRequest) {
- return null;
- }
-
@Override
public DescribeStreamResult describeStream(String s) {
return null;
diff --git a/pom.xml b/pom.xml
index e5d57af..5dbd627 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@ under the License.
</scm>
<properties>
- <aws.sdkv1.version>1.12.276</aws.sdkv1.version>
+ <aws.sdkv1.version>1.12.439</aws.sdkv1.version>
<aws.sdkv2.version>2.20.32</aws.sdkv2.version>
<netty.version>4.1.86.Final</netty.version>
<flink.version>1.16.0</flink.version>