karubian commented on code in PR #143:
URL: 
https://github.com/apache/flink-connector-aws/pull/143#discussion_r1643015195


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java:
##########
@@ -59,11 +96,41 @@ void testToSplitType() throws Exception {
     }
 
     @Test
-    void testOnSplitFinishedIsNoOp() throws Exception {
-        KinesisStreamsSourceReader<TestData> sourceReader =
-                new KinesisStreamsSourceReader<>(
-                        null, null, null, new Configuration(), new 
TestingReaderContext());
-        assertThatNoException()
-                .isThrownBy(() -> 
sourceReader.onSplitFinished(Collections.emptyMap()));
+    void testOnSplitFinishedShardMetricGroupUnregistered() throws Exception {
+        KinesisShardSplit split = getTestSplit();
+
+        List<KinesisShardSplit> splits = Collections.singletonList(split);
+        sourceReader.addSplits(splits);
+        
assertThat(kinesisSourceShardMetrics.getShardMetricGroupMap().get(split.getShardId()))
+                .isNotNull();
+
+        sourceReader.onSplitFinished(
+                Collections.singletonMap(split.getShardId(), new 
KinesisShardSplitState(split)));
+        
assertThat(kinesisSourceShardMetrics.getShardMetricGroupMap().get(split.getShardId()))
+                .isNull();
+    }
+
+    @Test
+    void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception {
+        KinesisShardSplit split = getTestSplit();
+
+        List<KinesisShardSplit> splits = Collections.singletonList(split);
+        sourceReader.addSplits(splits);
+
+        // Wait for fetcher tasks to finish to assert after the metric is 
registered and updated.
+        sourceReader
+                .isAvailable()
+                .whenComplete(
+                        (result, throwable) -> {
+                            assertThat(
+                                            kinesisSourceShardMetrics
+                                                    .getShardMetricGroupMap()
+                                                    .get(split.getShardId()))
+                                    .isNotNull();
+                            TestUtil.assertMillisBehindLatest(
+                                    split,
+                                    TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE,
+                                    metricListener);
+                        });

Review Comment:
   Thank you for spotting this! Updated and verified the assertion is executed 
in the same thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to