Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5992#discussion_r187788363
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
---
@@ -70,20 +113,107 @@ public void
testIsRecoverableExceptionWithNullErrorType() {
}
@Test
- public void testCustomConfigurationOverride() {
- Properties configProps = new Properties();
- configProps.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
- KinesisProxy proxy = new KinesisProxy(configProps) {
- @Override
- protected AmazonKinesis createKinesisClient(Properties
configProps) {
- ClientConfiguration clientConfig = new
ClientConfigurationFactory().getConfig();
- clientConfig.setSocketTimeout(10000);
- return AWSUtil.createKinesisClient(configProps,
clientConfig);
+ public void testGetShardList() throws Exception {
+ List<String> shardIds =
+ Arrays.asList(
+ "shardId-000000000000",
+ "shardId-000000000001",
+ "shardId-000000000002",
+ "shardId-000000000003");
+ shardIdSet = new HashSet<>(shardIds);
+ shards =
+ shardIds
+ .stream()
+ .map(shardId -> new
Shard().withShardId(shardId))
+ .collect(Collectors.toList());
+ Properties kinesisConsumerConfig = new Properties();
+
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
"fake_accesskey");
+ kinesisConsumerConfig.setProperty(
+ ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"fake_secretkey");
+ KinesisProxy kinesisProxy = new
KinesisProxy(kinesisConsumerConfig);
+ AmazonKinesis mockClient = mock(AmazonKinesis.class);
+ Whitebox.setInternalState(kinesisProxy, "kinesisClient",
mockClient);
+
+ ListShardsResult responseWithMoreData =
+ new
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+ ListShardsResult responseFinal =
+ new
ListShardsResult().withShards(shards.subList(2,
shards.size())).withNextToken(null);
+ doReturn(responseWithMoreData)
+ .when(mockClient)
+
.listShards(argThat(initialListShardsRequestMatcher()));
+
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
+ HashMap<String, String> streamHashMap =
+
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+ GetShardListResult shardListResult =
kinesisProxy.getShardList(streamHashMap);
+
+ Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+ Set<String> expectedStreams = new HashSet<>();
+ expectedStreams.add(fakeStreamName);
+
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(),
expectedStreams);
+ List<StreamShardHandle> actualShardList =
+
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+ List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
+ System.out.println(actualShardList.toString());
+ assertThat(actualShardList, hasSize(4));
+ for (int i = 0; i < 4; i++) {
+ StreamShardHandle shardHandle =
+ new StreamShardHandle(
+ fakeStreamName,
+ new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+ expectedStreamShard.add(shardHandle);
+ }
+
+ Assert.assertThat(
+ actualShardList,
+ containsInAnyOrder(
+ expectedStreamShard.toArray(new
StreamShardHandle[actualShardList.size()])));
+ }
+
+ private static class ListShardsRequestMatcher extends
TypeSafeDiagnosingMatcher<ListShardsRequest> {
+ private final String shardId;
+ private final String nextToken;
+
+ ListShardsRequestMatcher(String shardIdArg, String
nextTokenArg) {
+ shardId = shardIdArg;
+ nextToken = nextTokenArg;
+ }
+
+ @Override
+ protected boolean matchesSafely(final ListShardsRequest
listShardsRequest, final Description description) {
+ if (shardId == null) {
+ if
(StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
--- End diff --
Can we avoid using `StringUtils` and just use `!String.isEmpty()` instead?
---