This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new a62e3d8  [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of 
Stream Name for Kinesis API calls
a62e3d8 is described below

commit a62e3d866338f71345a92ac9d7e056eba603a763
Author: Hong Liang Teoh <[email protected]>
AuthorDate: Wed Apr 26 18:29:18 2023 +0100

    [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Name for 
Kinesis API calls
---
 flink-connector-aws-kinesis-streams/pom.xml        |  5 +++
 .../connector/kinesis/sink/KinesisStreamsSink.java | 29 +++++++++++----
 .../kinesis/sink/KinesisStreamsSinkBuilder.java    | 17 ++++++---
 .../kinesis/sink/KinesisStreamsSinkWriter.java     | 15 +++++++-
 .../sink/KinesisStreamsSinkBuilderTest.java        | 29 ++++++++++++++-
 .../kinesis/sink/KinesisStreamsSinkITCase.java     | 41 +++++++++++++---------
 flink-connector-kinesis/pom.xml                    |  4 +++
 .../connectors/kinesis/proxy/KinesisProxy.java     | 31 +++++++++++++---
 .../src/main/resources/META-INF/NOTICE             | 17 ++++-----
 .../kinesis/proxy/DynamoDBStreamsProxyTest.java    |  4 ++-
 .../connectors/kinesis/proxy/KinesisProxyTest.java | 39 ++++++++++++++++++--
 .../testutils/FakeKinesisClientFactory.java        | 32 ++++++++++++-----
 pom.xml                                            |  2 +-
 13 files changed, 210 insertions(+), 55 deletions(-)

diff --git a/flink-connector-aws-kinesis-streams/pom.xml 
b/flink-connector-aws-kinesis-streams/pom.xml
index 6c05ff1..eea58f5 100644
--- a/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws-kinesis-streams/pom.xml
@@ -58,6 +58,11 @@ under the License.
             <artifactId>kinesis</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>arns</artifactId>
+        </dependency>
+
         <!--Table API dependencies-->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
index b6cef5a..5b46196 100644
--- 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
+++ 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
 
+import software.amazon.awssdk.arns.Arn;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 
 import java.io.IOException;
@@ -68,6 +69,7 @@ public class KinesisStreamsSink<InputT> extends 
AsyncSinkBase<InputT, PutRecords
 
     private final boolean failOnError;
     private final String streamName;
+    private final String streamArn;
     private final Properties kinesisClientProperties;
 
     KinesisStreamsSink(
@@ -80,6 +82,7 @@ public class KinesisStreamsSink<InputT> extends 
AsyncSinkBase<InputT, PutRecords
             Long maxRecordSizeInBytes,
             boolean failOnError,
             String streamName,
+            String streamArn,
             Properties kinesisClientProperties) {
         super(
                 elementConverter,
@@ -89,13 +92,23 @@ public class KinesisStreamsSink<InputT> extends 
AsyncSinkBase<InputT, PutRecords
                 maxBatchSizeInBytes,
                 maxTimeInBufferMS,
                 maxRecordSizeInBytes);
-        this.streamName =
-                Preconditions.checkNotNull(
-                        streamName,
-                        "The stream name must not be null when initializing 
the KDS Sink.");
-        Preconditions.checkArgument(
-                !this.streamName.isEmpty(),
-                "The stream name must be set when initializing the KDS Sink.");
+
+        // Ensure that either streamName or streamArn is set. If both are set, 
streamArn takes
+        // precedence.
+        if (streamArn != null) {
+            final Arn arn = Arn.fromString(streamArn);
+            this.streamArn = arn.toString();
+            this.streamName = arn.resource().resource();
+        } else {
+            this.streamArn = null;
+            this.streamName =
+                    Preconditions.checkNotNull(
+                            streamName,
+                            "The stream name must not be null when 
initializing the KDS Sink.");
+            Preconditions.checkArgument(
+                    !this.streamName.isEmpty(),
+                    "The stream name must be set when initializing the KDS 
Sink.");
+        }
         this.failOnError = failOnError;
         this.kinesisClientProperties = kinesisClientProperties;
     }
@@ -126,6 +139,7 @@ public class KinesisStreamsSink<InputT> extends 
AsyncSinkBase<InputT, PutRecords
                 getMaxRecordSizeInBytes(),
                 failOnError,
                 streamName,
+                streamArn,
                 kinesisClientProperties,
                 Collections.emptyList());
     }
@@ -154,6 +168,7 @@ public class KinesisStreamsSink<InputT> extends 
AsyncSinkBase<InputT, PutRecords
                 getMaxRecordSizeInBytes(),
                 failOnError,
                 streamName,
+                streamArn,
                 kinesisClientProperties,
                 recoveredState);
     }
diff --git 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
index 3e6f7ec..b718a78 100644
--- 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
+++ 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java
@@ -30,13 +30,13 @@ import java.util.Properties;
  * Builder to construct {@link KinesisStreamsSink}.
  *
  * <p>The following example shows the minimum setup to create a {@link 
KinesisStreamsSink} that
- * writes String values to a Kinesis Data Streams stream named 
your_stream_here.
+ * writes String values to a Kinesis Data Streams stream.
  *
  * <pre>{@code
  * KinesisStreamsSink<String> kdsSink =
  *                 KinesisStreamsSink.<String>builder()
  *                         .setElementConverter(elementConverter)
- *                         .setStreamName("your_stream_name")
+ *                         .setStreamArn("your_stream_arn")
  *                         .setSerializationSchema(new SimpleStringSchema())
  *                         .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
  *                         .build();
@@ -71,6 +71,7 @@ public class KinesisStreamsSinkBuilder<InputT>
 
     private Boolean failOnError;
     private String streamName;
+    private String streamArn;
     private Properties kinesisClientProperties;
     private SerializationSchema<InputT> serializationSchema;
     private PartitionKeyGenerator<InputT> partitionKeyGenerator;
@@ -78,18 +79,23 @@ public class KinesisStreamsSinkBuilder<InputT>
     KinesisStreamsSinkBuilder() {}
 
     /**
-     * Sets the name of the KDS stream that the sink will connect to. There is 
no default for this
-     * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
-     * fail.
+     * Providing the stream name is deprecated. Please provide stream ARN 
using {@link
+     * KinesisStreamsSinkBuilder#setStreamArn} instead.
      *
      * @param streamName the name of the stream
      * @return {@link KinesisStreamsSinkBuilder} itself
      */
+    @Deprecated
     public KinesisStreamsSinkBuilder<InputT> setStreamName(String streamName) {
         this.streamName = streamName;
         return this;
     }
 
+    public KinesisStreamsSinkBuilder<InputT> setStreamArn(String streamArn) {
+        this.streamArn = streamArn;
+        return this;
+    }
+
     public KinesisStreamsSinkBuilder<InputT> setSerializationSchema(
             SerializationSchema<InputT> serializationSchema) {
         this.serializationSchema = serializationSchema;
@@ -129,6 +135,7 @@ public class KinesisStreamsSinkBuilder<InputT>
                 
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
                 Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
                 streamName,
+                streamArn,
                 Optional.ofNullable(kinesisClientProperties).orElse(new 
Properties()));
     }
 }
diff --git 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index c1bf1ff..2ff49cb 100644
--- 
a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++ 
b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.kinesis.sink;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.aws.util.AWSClientUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
@@ -59,6 +60,7 @@ import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptio
  * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
  * AWS_SECRET_ACCESS_KEY} through environment variables etc.
  */
+@Internal
 class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
PutRecordsRequestEntry> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSinkWriter.class);
 
@@ -82,6 +84,9 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
     /* Name of the stream in Kinesis Data Streams */
     private final String streamName;
 
+    /* ARN of the stream in Kinesis Data Streams */
+    private final String streamArn;
+
     /* The sink writer metric group */
     private final SinkWriterMetricGroup metrics;
 
@@ -105,6 +110,7 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
             long maxRecordSizeInBytes,
             boolean failOnError,
             String streamName,
+            String streamArn,
             Properties kinesisClientProperties) {
         this(
                 elementConverter,
@@ -117,6 +123,7 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
                 maxRecordSizeInBytes,
                 failOnError,
                 streamName,
+                streamArn,
                 kinesisClientProperties,
                 Collections.emptyList());
     }
@@ -132,6 +139,7 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
             long maxRecordSizeInBytes,
             boolean failOnError,
             String streamName,
+            String streamArn,
             Properties kinesisClientProperties,
             Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
         super(
@@ -148,6 +156,7 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
                 states);
         this.failOnError = failOnError;
         this.streamName = streamName;
+        this.streamArn = streamArn;
         this.metrics = context.metricGroup();
         this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
         this.httpClient = 
AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
@@ -172,7 +181,11 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
             Consumer<List<PutRecordsRequestEntry>> requestResult) {
 
         PutRecordsRequest batchRequest =
-                
PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
+                PutRecordsRequest.builder()
+                        .records(requestEntries)
+                        .streamName(streamName)
+                        .streamARN(streamArn)
+                        .build();
 
         CompletableFuture<PutRecordsResponse> future = 
kinesisClient.putRecords(batchRequest);
 
diff --git 
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
 
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
index ce88c63..539decc 100644
--- 
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
+++ 
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java
@@ -39,7 +39,7 @@ class KinesisStreamsSinkBuilderTest {
     }
 
     @Test
-    void streamNameOfSinkMustBeSetWhenBuilt() {
+    void streamNameOrStreamArnMustBeSetWhenBuilt() {
         Assertions.assertThatExceptionOfType(NullPointerException.class)
                 .isThrownBy(
                         () ->
@@ -65,6 +65,33 @@ class KinesisStreamsSinkBuilderTest {
                         "The stream name must be set when initializing the KDS 
Sink.");
     }
 
+    @Test
+    void streamArnMustNotBeEmptyWhenBuilt() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisStreamsSink.<String>builder()
+                                        
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+                                        
.setSerializationSchema(SERIALIZATION_SCHEMA)
+                                        .setStreamArn("")
+                                        .build())
+                .withMessageContaining("Malformed ARN - doesn't start with 
'arn:'");
+    }
+
+    @Test
+    void streamArnResouceMustNotBeEmptyWhenBuilt() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisStreamsSink.<String>builder()
+                                        
.setPartitionKeyGenerator(PARTITION_KEY_GENERATOR)
+                                        
.setSerializationSchema(SERIALIZATION_SCHEMA)
+                                        .setStreamArn(
+                                                
"arn:aws:kinesis:us-east-1:000000000000:stream/")
+                                        .build())
+                .withMessageContaining("resource must not be blank or empty.");
+    }
+
     @Test
     void serializationSchemaMustBeSetWhenSinkIsBuilt() {
         Assertions.assertThatExceptionOfType(NullPointerException.class)
diff --git 
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
 
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
index 2360558..cee11e5 100644
--- 
a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
+++ 
b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
@@ -173,7 +173,8 @@ class KinesisStreamsSinkITCase {
                                         .runScenario())
                 .havingCause()
                 .havingCause()
-                .withMessageContaining("Encountered non-recoverable 
exception");
+                .havingCause()
+                .withMessageContaining("Stream non-existent-stream under 
account");
     }
 
     @Test
@@ -343,18 +344,21 @@ class KinesisStreamsSinkITCase {
                 false, "WRONG", "Invalid AWS Credential Provider Type");
     }
 
-    private void credentialsProvidedThroughProfilePathShouldResultInFailure(
-            boolean failOnError,
-            String credentialsProvider,
-            String credentialsProfileLocation,
-            String expectedMessage) {
-        Properties properties =
-                
getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(credentialsProvider);
-        properties.put(
-                AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
-                credentialsProfileLocation);
-        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
-                failOnError, properties, expectedMessage);
+    @Test
+    void streamArnShouldTakePrecedenceOverStreamName() {
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(
+                        () ->
+                                new Scenario()
+                                        
.withKinesaliteStreamName("stream-exists")
+                                        .withSinkConnectionStreamArn(
+                                                
"arn:aws:kinesis:us-east-1:000000000000:stream/stream-not-exists")
+                                        
.withSinkConnectionStreamName("stream-exists")
+                                        .withFailOnError(true)
+                                        .withProperties(getDefaultProperties())
+                                        .runScenario())
+                .withStackTraceContaining(
+                        "Stream stream-not-exists under account 000000000000 
not found");
     }
 
     private void 
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
@@ -375,9 +379,7 @@ class KinesisStreamsSinkITCase {
                                         .withFailOnError(failOnError)
                                         .withProperties(properties)
                                         .runScenario())
-                .havingCause()
-                .havingCause()
-                .withMessageContaining(expectedMessage);
+                .withStackTraceContaining(expectedMessage);
     }
 
     private Properties 
getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(
@@ -408,6 +410,7 @@ class KinesisStreamsSinkITCase {
         private boolean failOnError = false;
         private String kinesaliteStreamName = null;
         private String sinkConnectionStreamName;
+        private String sinkConnectionStreamArn;
         private SerializationSchema<String> serializationSchema =
                 KinesisStreamsSinkITCase.this.serializationSchema;
         private PartitionKeyGenerator<String> partitionKeyGenerator =
@@ -440,6 +443,7 @@ class KinesisStreamsSinkITCase {
                             .setFailOnError(failOnError)
                             .setMaxBufferedRequests(1000)
                             .setStreamName(sinkConnectionStreamName)
+                            .setStreamArn(sinkConnectionStreamArn)
                             .setKinesisClientProperties(properties)
                             .setFailOnError(true)
                             .build();
@@ -509,6 +513,11 @@ class KinesisStreamsSinkITCase {
             return this;
         }
 
+        public Scenario withSinkConnectionStreamArn(String 
sinkConnectionStreamArn) {
+            this.sinkConnectionStreamArn = sinkConnectionStreamArn;
+            return this;
+        }
+
         public Scenario withKinesaliteStreamName(String kinesaliteStreamName) {
             this.kinesaliteStreamName = kinesaliteStreamName;
             return this;
diff --git a/flink-connector-kinesis/pom.xml b/flink-connector-kinesis/pom.xml
index 1194941..e2b0f78 100644
--- a/flink-connector-kinesis/pom.xml
+++ b/flink-connector-kinesis/pom.xml
@@ -74,6 +74,10 @@ under the License.
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-cloudwatch</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>amazon-kinesis-producer</artifactId>
diff --git 
a/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index bfa926e..b5b2b37 100644
--- 
a/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -58,6 +58,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -140,6 +141,9 @@ public class KinesisProxy implements KinesisProxyInterface {
     /** Exponential backoff power constant for the describe stream operation. 
*/
     private final double describeStreamExpConstant;
 
+    /** Caches retrieved stream ARNs for give stream names. */
+    private final Map<String, String> streamNameToArnLookup = new 
ConcurrentHashMap<>();
+
     /**
      * Create a new KinesisProxy based on the supplied configuration 
properties.
      *
@@ -350,6 +354,7 @@ public class KinesisProxy implements KinesisProxyInterface {
         GetShardIteratorRequest getShardIteratorRequest =
                 new GetShardIteratorRequest()
                         .withStreamName(shard.getStreamName())
+                        .withStreamARN(lookupStreamArn(shard.getStreamName()))
                         .withShardId(shard.getShard().getShardId())
                         .withShardIteratorType(shardIteratorType);
 
@@ -464,13 +469,15 @@ public class KinesisProxy implements 
KinesisProxyInterface {
             String streamName, @Nullable String lastSeenShardId) throws 
InterruptedException {
         List<StreamShardHandle> shardsOfStream = new ArrayList<>();
 
+        String streamArn = lookupStreamArn(streamName);
+
         // List Shards returns just the first 1000 shard entries. In order to 
read the entire
         // stream,
         // we need to use the returned nextToken to get additional shards.
         ListShardsResult listShardsResult;
         String startShardToken = null;
         do {
-            listShardsResult = listShards(streamName, lastSeenShardId, 
startShardToken);
+            listShardsResult = listShards(streamName, streamArn, 
lastSeenShardId, startShardToken);
             if (listShardsResult == null) {
                 // In case we have exceptions while retrieving all shards, 
ensure that incomplete
                 // shard list is not returned.
@@ -488,6 +495,15 @@ public class KinesisProxy implements KinesisProxyInterface 
{
         return shardsOfStream;
     }
 
+    private synchronized String lookupStreamArn(String streamName) throws 
InterruptedException {
+        if (streamNameToArnLookup.containsKey(streamName)) {
+            return streamNameToArnLookup.get(streamName);
+        }
+        String streamArn = describeStream(streamName, 
null).getStreamDescription().getStreamARN();
+        streamNameToArnLookup.put(streamName, streamArn);
+        return streamArn;
+    }
+
     /**
      * Get metainfo for a Kinesis stream, which contains information about 
which shards this Kinesis
      * stream possess.
@@ -498,18 +514,23 @@ public class KinesisProxy implements 
KinesisProxyInterface {
      * subtask's fetcher. This jitter backoff approach will help distribute 
calls across the
      * fetchers over time.
      *
-     * @param streamName the stream to describe
+     * @param streamName the stream name
+     * @param streamArn the stream ARN
      * @param startShardId which shard to start with for this describe 
operation (earlier shard's
      *     infos will not appear in result)
      * @return the result of the describe stream operation
      */
     private ListShardsResult listShards(
-            String streamName, @Nullable String startShardId, @Nullable String 
startNextToken)
+            String streamName,
+            String streamArn,
+            @Nullable String startShardId,
+            @Nullable String startNextToken)
             throws InterruptedException {
         final ListShardsRequest listShardsRequest = new ListShardsRequest();
         if (startNextToken == null) {
             listShardsRequest.setExclusiveStartShardId(startShardId);
             listShardsRequest.setStreamName(streamName);
+            listShardsRequest.setStreamARN(streamArn);
         } else {
             // Note the nextToken returned by AWS expires within 300 sec.
             listShardsRequest.setNextToken(startNextToken);
@@ -535,7 +556,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                                 retryCount++);
                 LOG.warn(
                         "Got LimitExceededException when listing shards from 
stream "
-                                + streamName
+                                + streamArn
                                 + ". Backing off for "
                                 + backoffMillis
                                 + " millis.");
@@ -567,7 +588,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                                     retryCount++);
                     LOG.warn(
                             "Got SdkClientException when listing shards from 
stream {}. Backing off for {} millis.",
-                            streamName,
+                            streamArn,
                             backoffMillis);
                     BACKOFF.sleep(backoffMillis);
                 } else {
diff --git a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE 
b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
index acc0646..2f5d166 100644
--- a/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
+++ b/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
@@ -8,6 +8,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 
 - software.amazon.ion:ion-java:1.0.2
 - software.amazon.eventstream:eventstream:1.0.1
+- software.amazon.awssdk:arns:2.20.32
 - software.amazon.awssdk:utils:2.20.32
 - software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.20.32
 - software.amazon.awssdk:third-party-jackson-core:2.20.32
@@ -45,15 +46,15 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.fasterxml.jackson.core:jackson-databind:2.13.4.2
 - com.fasterxml.jackson.core:jackson-core:2.13.4
 - com.fasterxml.jackson.core:jackson-annotations:2.13.4
-- com.amazonaws:jmespath-java:1.12.276
+- com.amazonaws:jmespath-java:1.12.439
 - com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3
-- com.amazonaws:aws-java-sdk-sts:1.12.276
-- com.amazonaws:aws-java-sdk-s3:1.12.276
-- com.amazonaws:aws-java-sdk-kms:1.12.276
-- com.amazonaws:aws-java-sdk-kinesis:1.12.276
-- com.amazonaws:aws-java-sdk-dynamodb:1.12.276
-- com.amazonaws:aws-java-sdk-core:1.12.276
-- com.amazonaws:aws-java-sdk-cloudwatch:1.12.276
+- com.amazonaws:aws-java-sdk-sts:1.12.439
+- com.amazonaws:aws-java-sdk-s3:1.12.439
+- com.amazonaws:aws-java-sdk-kms:1.12.439
+- com.amazonaws:aws-java-sdk-kinesis:1.12.439
+- com.amazonaws:aws-java-sdk-dynamodb:1.12.439
+- com.amazonaws:aws-java-sdk-core:1.12.439
+- com.amazonaws:aws-java-sdk-cloudwatch:1.12.439
 - com.amazonaws:amazon-kinesis-producer:0.14.1
 - com.amazonaws:amazon-kinesis-client:1.14.8
 
diff --git 
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
 
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
index 261e69d..8d42962 100644
--- 
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
+++ 
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxyTest.java
@@ -35,6 +35,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 class DynamoDBStreamsProxyTest {
 
     private static final String FAKE_STREAM_NAME = "fake-stream";
+    private static final String FAKE_STREAM_ARN =
+            "arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
 
     private static final List<String> SHARD_IDS =
             Arrays.asList(
@@ -76,7 +78,7 @@ class DynamoDBStreamsProxyTest {
 
         protected AmazonKinesis createKinesisClient(Properties configProps) {
             return 
FakeKinesisClientFactory.resourceNotFoundWhenGettingShardIterator(
-                    FAKE_STREAM_NAME, SHARD_IDS);
+                    FAKE_STREAM_NAME, FAKE_STREAM_ARN, SHARD_IDS);
         }
     }
 }
diff --git 
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index a3e9756..60bf2d2 100644
--- 
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ 
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -31,12 +31,16 @@ import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.AmazonKinesisException;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.ListShardsRequest;
 import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import com.amazonaws.services.kinesis.model.StreamStatus;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.http.HttpHost;
 import org.apache.http.conn.ConnectTimeoutException;
@@ -210,6 +214,7 @@ public class KinesisProxyTest {
                         "shardId-000000000003");
         String nextToken = "NextToken";
         String fakeStreamName = "fake-stream";
+        String fakeStreamArn = 
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
         List<Shard> shards =
                 shardIds.stream()
                         .map(shardId -> new Shard().withShardId(shardId))
@@ -217,6 +222,8 @@ public class KinesisProxyTest {
         AmazonKinesis mockClient = mock(AmazonKinesis.class);
         KinesisProxy kinesisProxy = getProxy(mockClient);
 
+        
when(mockClient.describeStream(getDescribeStreamRequest(fakeStreamName)))
+                .thenReturn(getDescribeStreamResult(fakeStreamArn));
         ListShardsResult responseWithMoreData =
                 new ListShardsResult().withShards(shards.subList(0, 
2)).withNextToken(nextToken);
         ListShardsResult responseFinal =
@@ -266,6 +273,7 @@ public class KinesisProxyTest {
                         KinesisShardIdGenerator.generateFromShardOrder(0),
                         KinesisShardIdGenerator.generateFromShardOrder(1));
         String fakeStreamName = "fake-stream";
+        String fakeStreamArn = 
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
         List<Shard> shards =
                 shardIds.stream()
                         .map(shardId -> new Shard().withShardId(shardId))
@@ -274,6 +282,8 @@ public class KinesisProxyTest {
         AmazonKinesis mockClient = mock(AmazonKinesis.class);
         KinesisProxy kinesisProxy = getProxy(mockClient);
 
+        
when(mockClient.describeStream(getDescribeStreamRequest(fakeStreamName)))
+                .thenReturn(getDescribeStreamResult(fakeStreamArn));
         ListShardsResult responseFirst =
                 new ListShardsResult().withShards(shards).withNextToken(null);
         doReturn(responseFirst)
@@ -356,13 +366,17 @@ public class KinesisProxyTest {
     public void testGetShardWithNoNewShards() throws Exception {
         // given
         String fakeStreamName = "fake-stream";
+        String fakeStreamArn = 
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
 
         AmazonKinesis mockClient = mock(AmazonKinesis.class);
         KinesisProxy kinesisProxy = getProxy(mockClient);
 
+        
when(mockClient.describeStream(getDescribeStreamRequest(fakeStreamName)))
+                .thenReturn(getDescribeStreamResult(fakeStreamArn));
         when(mockClient.listShards(
                         new ListShardsRequest()
                                 .withStreamName(fakeStreamName)
+                                .withStreamARN(fakeStreamArn)
                                 .withExclusiveStartShardId(
                                         
KinesisShardIdGenerator.generateFromShardOrder(1))))
                 .thenReturn(new 
ListShardsResult().withShards(Collections.emptyList()));
@@ -379,6 +393,9 @@ public class KinesisProxyTest {
 
     @Test
     public void testGetShardListRetry() throws Exception {
+        final String streamName = "fake-stream";
+        final String streamArn = 
"arn:aws:kinesis:us-east-1:123456789012:stream/fake-stream";
+
         Properties kinesisConsumerConfig = new Properties();
         kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
 
@@ -394,6 +411,8 @@ public class KinesisProxyTest {
                 };
 
         AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
+        when(mockClient.describeStream(getDescribeStreamRequest(streamName)))
+                .thenReturn(getDescribeStreamResult(streamArn));
         when(mockClient.listShards(any()))
                 .thenAnswer(
                         new Answer<ListShardsResult>() {
@@ -413,11 +432,11 @@ public class KinesisProxyTest {
         Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
 
         HashMap<String, String> streamNames = new HashMap();
-        streamNames.put("fake-stream", null);
+        streamNames.put(streamName, null);
         GetShardListResult result = kinesisProxy.getShardList(streamNames);
         
assertThat(exceptionCount.intValue()).isEqualTo(retriableExceptions.length);
         assertThat(result.hasRetrievedShards()).isTrue();
-        
assertThat(result.getLastSeenShardOfStream("fake-stream").getShard().getShardId())
+        
assertThat(result.getLastSeenShardOfStream(streamName).getShard().getShardId())
                 .isEqualTo(shard.getShardId());
 
         // test max attempt count exceeded
@@ -545,4 +564,20 @@ public class KinesisProxyTest {
 
         return kinesisProxy;
     }
+
+    private DescribeStreamRequest getDescribeStreamRequest(final String 
streamName) {
+        DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
+        describeStreamRequest.setStreamName(streamName);
+        return describeStreamRequest;
+    }
+
+    private DescribeStreamResult getDescribeStreamResult(final String 
streamArn) {
+        StreamDescription streamDescription = new StreamDescription();
+        streamDescription.setStreamARN(streamArn);
+        streamDescription.setStreamStatus(StreamStatus.ACTIVE);
+
+        DescribeStreamResult describeStreamResult = new DescribeStreamResult();
+        describeStreamResult.setStreamDescription(streamDescription);
+        return describeStreamResult;
+    }
 }
diff --git 
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
 
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
index a9f72a0..155c051 100644
--- 
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
+++ 
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisClientFactory.java
@@ -77,6 +77,8 @@ import 
com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest;
 import com.amazonaws.services.kinesis.model.StartStreamEncryptionResult;
 import com.amazonaws.services.kinesis.model.StopStreamEncryptionRequest;
 import com.amazonaws.services.kinesis.model.StopStreamEncryptionResult;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
 import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
 import com.amazonaws.services.kinesis.model.UpdateStreamModeRequest;
@@ -93,13 +95,33 @@ import java.util.List;
 public class FakeKinesisClientFactory {
 
     public static AmazonKinesis resourceNotFoundWhenGettingShardIterator(
-            String streamName, List<String> shardIds) {
+            String streamName, String streamArn, List<String> shardIds) {
         return new AmazonKinesis() {
 
+            @Override
+            public DescribeStreamResult describeStream(
+                    DescribeStreamRequest describeStreamRequest) {
+                if (streamName.equals(describeStreamRequest.getStreamName())) {
+                    return new DescribeStreamResult()
+                            .withStreamDescription(
+                                    new StreamDescription()
+                                            .withStreamARN(streamArn)
+                                            
.withStreamStatus(StreamStatus.ACTIVE));
+                }
+
+                final ResourceNotFoundException ex =
+                        new ResourceNotFoundException(
+                                "Requested resource not found: Stream does not 
exist: "
+                                        + 
describeStreamRequest.getStreamName());
+                ex.setErrorType(AmazonServiceException.ErrorType.Client);
+
+                throw ex;
+            }
+
             @Override
             public GetShardIteratorResult getShardIterator(
                     GetShardIteratorRequest getShardIteratorRequest) {
-                if (getShardIteratorRequest.getStreamName().equals(streamName)
+                if (streamArn.equals(getShardIteratorRequest.getStreamARN())
                         && 
shardIds.contains(getShardIteratorRequest.getShardId())) {
                     return new 
GetShardIteratorResult().withShardIterator("fakeShardIterator");
                 }
@@ -162,12 +184,6 @@ public class FakeKinesisClientFactory {
                 return null;
             }
 
-            @Override
-            public DescribeStreamResult describeStream(
-                    DescribeStreamRequest describeStreamRequest) {
-                return null;
-            }
-
             @Override
             public DescribeStreamResult describeStream(String s) {
                 return null;
diff --git a/pom.xml b/pom.xml
index e5d57af..5dbd627 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@ under the License.
     </scm>
 
     <properties>
-        <aws.sdkv1.version>1.12.276</aws.sdkv1.version>
+        <aws.sdkv1.version>1.12.439</aws.sdkv1.version>
         <aws.sdkv2.version>2.20.32</aws.sdkv2.version>
         <netty.version>4.1.86.Final</netty.version>
         <flink.version>1.16.0</flink.version>


Reply via email to