Github user kailashhd commented on a diff in the pull request:
https://github.com/apache/flink/pull/5992#discussion_r188377393
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
@@ -382,50 +386,62 @@ protected static boolean
isRecoverableException(AmazonServiceException ex) {
* @param startShardId which shard to start with for this describe
operation (earlier shard's infos will not appear in result)
* @return the result of the describe stream operation
*/
- private DescribeStreamResult describeStream(String streamName,
@Nullable String startShardId) throws InterruptedException {
- final DescribeStreamRequest describeStreamRequest = new
DescribeStreamRequest();
- describeStreamRequest.setStreamName(streamName);
- describeStreamRequest.setExclusiveStartShardId(startShardId);
+ private ListShardsResult listShards(String streamName, @Nullable String
startShardId,
+
@Nullable String startNextToken)
+ throws InterruptedException {
+ final ListShardsRequest listShardsRequest = new
ListShardsRequest();
+ if (startNextToken == null) {
+
listShardsRequest.setExclusiveStartShardId(startShardId);
+ listShardsRequest.setStreamName(streamName);
+ } else {
+ // Note the nextToken returned by AWS expires within
300 sec.
+ listShardsRequest.setNextToken(startNextToken);
+ }
- DescribeStreamResult describeStreamResult = null;
+ ListShardsResult listShardsResults = null;
- // Call DescribeStream, with full-jitter backoff (if we get
LimitExceededException).
+ // Call ListShards, with full-jitter backoff (if we get
LimitExceededException).
int attemptCount = 0;
- while (describeStreamResult == null) { // retry until we get a
result
+ // List Shards returns just the first 1000 shard entries. Make
sure that all entries
+ // are taken up.
+ while (listShardsResults == null) { // retry until we get a
result
try {
- describeStreamResult =
kinesisClient.describeStream(describeStreamRequest);
+ listShardsResults =
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
- describeStreamBaseBackoffMillis,
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
- LOG.warn("Got LimitExceededException when
describing stream " + streamName + ". Backing off for "
- + backoffMillis + " millis.");
+ listShardsBaseBackoffMillis,
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+ LOG.warn("Got LimitExceededException
when listing shards from stream " + streamName
+ + ".
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
- } catch (ResourceNotFoundException re) {
- throw new RuntimeException("Error while getting
stream details", re);
- }
- }
-
- String streamStatus =
describeStreamResult.getStreamDescription().getStreamStatus();
- if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) ||
streamStatus.equals(StreamStatus.UPDATING.toString()))) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("The status of stream " + streamName +
" is " + streamStatus + "; result of the current " +
- "describeStream operation will not
contain any shard information.");
+ } catch (ResourceInUseException reInUse) {
+ if (LOG.isWarnEnabled()) {
+ // List Shards will throw an exception
if stream in not in active state. Will return
+ LOG.warn("The stream is currently not
in active state. Reusing the older state "
+ + "for the time being");
+ break;
+ }
+ } catch (ResourceNotFoundException reNotFound) {
+ throw new RuntimeException("Stream not found.
Error while getting shard list.", reNotFound);
+ } catch (InvalidArgumentException inArg) {
+ throw new RuntimeException("Invalid Arguments
to listShards.", inArg);
+ } catch (ExpiredNextTokenException expiredToken) {
+ LOG.warn("List Shards has an expired token.
Reusing the previous state.");
+ break;
}
}
-
- // Kinesalite (mock implementation of Kinesis) does not
correctly exclude shards before the exclusive
- // start shard id in the returned shards list; check if we need
to remove these erroneously returned shards
- if (startShardId != null) {
- List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
+ // Kinesalite (mock implementation of Kinesis) does not correctly
exclude shards before
+ // the exclusive start shard id in the returned shards list;
check if we need to remove
+ // these erroneously returned shards.
+ if (startShardId != null && listShardsResults != null) {
+ List<Shard> shards = listShardsResults.getShards();
Iterator<Shard> shardItr = shards.iterator();
- while (shardItr.hasNext()) {
+ while (shardItr.hasNext()){
--- End diff --
Done.
---