karubian commented on code in PR #143:
URL:
https://github.com/apache/flink-connector-aws/pull/143#discussion_r1643015195
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java:
##########
@@ -59,11 +96,41 @@ void testToSplitType() throws Exception {
}
@Test
- void testOnSplitFinishedIsNoOp() throws Exception {
- KinesisStreamsSourceReader<TestData> sourceReader =
- new KinesisStreamsSourceReader<>(
- null, null, null, new Configuration(), new
TestingReaderContext());
- assertThatNoException()
- .isThrownBy(() ->
sourceReader.onSplitFinished(Collections.emptyMap()));
+ void testOnSplitFinishedShardMetricGroupUnregistered() throws Exception {
+ KinesisShardSplit split = getTestSplit();
+
+ List<KinesisShardSplit> splits = Collections.singletonList(split);
+ sourceReader.addSplits(splits);
+
assertThat(kinesisSourceShardMetrics.getShardMetricGroupMap().get(split.getShardId()))
+ .isNotNull();
+
+ sourceReader.onSplitFinished(
+ Collections.singletonMap(split.getShardId(), new
KinesisShardSplitState(split)));
+
assertThat(kinesisSourceShardMetrics.getShardMetricGroupMap().get(split.getShardId()))
+ .isNull();
+ }
+
+ @Test
+ void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception {
+ KinesisShardSplit split = getTestSplit();
+
+ List<KinesisShardSplit> splits = Collections.singletonList(split);
+ sourceReader.addSplits(splits);
+
+ // Wait for fetcher tasks to finish to assert after the metric is
registered and updated.
+ sourceReader
+ .isAvailable()
+ .whenComplete(
+ (result, throwable) -> {
+ assertThat(
+ kinesisSourceShardMetrics
+ .getShardMetricGroupMap()
+ .get(split.getShardId()))
+ .isNotNull();
+ TestUtil.assertMillisBehindLatest(
+ split,
+ TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE,
+ metricListener);
+ });
Review Comment:
Thank you for spotting this! Updated and verified the assertion is executed
in the same thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]