z3d1k commented on code in PR #143:
URL:
https://github.com/apache/flink-connector-aws/pull/143#discussion_r1642998408
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java:
##########
@@ -59,11 +96,41 @@ void testToSplitType() throws Exception {
}
@Test
- void testOnSplitFinishedIsNoOp() throws Exception {
- KinesisStreamsSourceReader<TestData> sourceReader =
- new KinesisStreamsSourceReader<>(
- null, null, null, new Configuration(), new
TestingReaderContext());
- assertThatNoException()
- .isThrownBy(() ->
sourceReader.onSplitFinished(Collections.emptyMap()));
+ void testOnSplitFinishedShardMetricGroupUnregistered() throws Exception {
+ KinesisShardSplit split = getTestSplit();
+
+ List<KinesisShardSplit> splits = Collections.singletonList(split);
+ sourceReader.addSplits(splits);
+
assertThat(kinesisSourceShardMetrics.getShardMetricGroupMap().get(split.getShardId()))
+ .isNotNull();
+
+ sourceReader.onSplitFinished(
+ Collections.singletonMap(split.getShardId(), new
KinesisShardSplitState(split)));
+
assertThat(kinesisSourceShardMetrics.getShardMetricGroupMap().get(split.getShardId()))
+ .isNull();
+ }
+
+ @Test
+ void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception {
+ KinesisShardSplit split = getTestSplit();
+
+ List<KinesisShardSplit> splits = Collections.singletonList(split);
+ sourceReader.addSplits(splits);
+
+ // Wait for fetcher tasks to finish to assert after the metric is
registered and updated.
+ sourceReader
+ .isAvailable()
+ .whenComplete(
+ (result, throwable) -> {
+ assertThat(
+ kinesisSourceShardMetrics
+ .getShardMetricGroupMap()
+ .get(split.getShardId()))
+ .isNotNull();
+ TestUtil.assertMillisBehindLatest(
+ split,
+ TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE,
+ metricListener);
+ });
Review Comment:
Replace async handling with blocking one, e.g.
`sourceReader.isAvailable().get()`.
In current state assert result may be ignored, as it is being executed in
different thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]