This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 2ec095a  [FLINK-23705][connectors/kinesis] Unregistering metric 
reporting for closed shards
2ec095a is described below

commit 2ec095a67bccda8033b4ce3e433f0a657706ae84
Author: Elphas Toringepi <[email protected]>
AuthorDate: Thu Aug 12 17:46:29 2021 +0100

    [FLINK-23705][connectors/kinesis] Unregistering metric reporting for closed 
shards
---
 .../kinesis/internals/ShardConsumer.java           |  2 +
 .../metrics/ShardConsumerMetricsReporter.java      | 10 ++++
 .../internals/DynamoDBStreamsDataFetcherTest.java  |  5 +-
 .../kinesis/internals/ShardConsumerTest.java       | 28 ++++++++++
 .../kinesis/internals/ShardConsumerTestUtils.java  | 64 +++++++++++++++++++---
 .../polling/PollingRecordPublisherFactoryTest.java |  6 +-
 .../polling/PollingRecordPublisherTest.java        |  8 ++-
 .../metrics/ShardConsumerMetricsReporterTest.java  | 15 +++++
 .../connectors/kinesis/testutils/TestUtils.java    |  4 +-
 9 files changed, 126 insertions(+), 16 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 31af3f1..85dc8f4 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -158,6 +158,8 @@ public class ShardConsumer<T> implements Runnable {
             }
         } catch (Throwable t) {
             fetcherRef.stopWithError(t);
+        } finally {
+            this.shardConsumerMetricsReporter.unregister();
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
index 777cc61..d8fdf99 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
@@ -20,18 +20,22 @@ package 
org.apache.flink.streaming.connectors.kinesis.metrics;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 
 /** A container for {@link ShardConsumer}s to report metric values. */
 @Internal
 public class ShardConsumerMetricsReporter {
 
+    private final MetricGroup metricGroup;
+
     private volatile long millisBehindLatest = -1;
     private volatile long averageRecordSizeBytes = 0L;
     private volatile int numberOfAggregatedRecords = 0;
     private volatile int numberOfDeaggregatedRecords = 0;
 
     public ShardConsumerMetricsReporter(final MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
         metricGroup.gauge(
                 KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE,
                 this::getMillisBehindLatest);
@@ -77,4 +81,10 @@ public class ShardConsumerMetricsReporter {
     public void setNumberOfDeaggregatedRecords(int 
numberOfDeaggregatedRecords) {
         this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
     }
+
+    public void unregister() {
+        if (this.metricGroup instanceof AbstractMetricGroup) {
+            ((AbstractMetricGroup) this.metricGroup).close();
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
index 91b1abc..b847b30 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
@@ -32,6 +33,7 @@ import java.util.Properties;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -60,7 +62,8 @@ public class DynamoDBStreamsDataFetcherTest {
         fetcher.createRecordPublisher(
                 SENTINEL_LATEST_SEQUENCE_NUM.get(),
                 new Properties(),
-                runtimeContext.getMetricGroup(),
+                createFakeShardConsumerMetricGroup(
+                        (OperatorMetricGroup) runtimeContext.getMetricGroup()),
                 dummyStreamShardHandle);
 
         verify(kinesis).getShardIterator(dummyStreamShardHandle, 
LATEST.toString(), null);
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 7844dc3..22b5d03 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
 import 
org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
@@ -33,10 +34,12 @@ import java.util.Properties;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
@@ -59,6 +62,17 @@ public class ShardConsumerTest {
     }
 
     @Test
+    public void 
testConsumerAndProducerMetricsAreUnregisteredAfterShardCompletes()
+            throws Exception {
+        KinesisProxyInterface kinesis =
+                
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(
+                        500, 5, 500);
+        AbstractMetricGroup metricGroup = createFakeShardConsumerMetricGroup();
+        assertNumberOfMessagesReceivedFromKinesis(500, kinesis, 
fakeSequenceNumber(), metricGroup);
+        assertTrue(metricGroup.isClosed());
+    }
+
+    @Test
     public void testTimestampStartingPositionWithEmptyShard() throws Exception 
{
         Properties consumerProperties = new Properties();
         consumerProperties.setProperty(
@@ -207,6 +221,20 @@ public class ShardConsumerTest {
             final int expectedNumberOfMessages,
             final KinesisProxyInterface kinesis,
             final SequenceNumber startingSequenceNumber,
+            final AbstractMetricGroup metricGroup)
+            throws Exception {
+        return 
ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+                expectedNumberOfMessages,
+                new PollingRecordPublisherFactory(config -> kinesis),
+                startingSequenceNumber,
+                new Properties(),
+                metricGroup);
+    }
+
+    private ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
+            final int expectedNumberOfMessages,
+            final KinesisProxyInterface kinesis,
+            final SequenceNumber startingSequenceNumber,
             final Properties consumerProperties)
             throws Exception {
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
index c34b60e..1c746a6 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
@@ -18,9 +18,12 @@
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -47,12 +50,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 
 /** Tests for the {@link ShardConsumer}. */
 public class ShardConsumerTestUtils {
 
-    public static <T> ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
+    public static ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
             final int expectedNumberOfMessages,
             final RecordPublisherFactory recordPublisherFactory,
             final SequenceNumber startingSequenceNumber,
@@ -66,15 +68,32 @@ public class ShardConsumerTestUtils {
                 SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
     }
 
-    public static <T> ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
+    public static ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
             final int expectedNumberOfMessages,
             final RecordPublisherFactory recordPublisherFactory,
             final SequenceNumber startingSequenceNumber,
             final Properties consumerProperties,
-            final SequenceNumber expectedLastProcessedSequenceNum)
+            final AbstractMetricGroup metricGroup)
+            throws InterruptedException {
+        return assertNumberOfMessagesReceivedFromKinesis(
+                expectedNumberOfMessages,
+                recordPublisherFactory,
+                startingSequenceNumber,
+                consumerProperties,
+                SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+                metricGroup);
+    }
+
+    public static ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
+            final int expectedNumberOfMessages,
+            final RecordPublisherFactory recordPublisherFactory,
+            final SequenceNumber startingSequenceNumber,
+            final Properties consumerProperties,
+            final SequenceNumber expectedLastProcessedSequenceNum,
+            final AbstractMetricGroup metricGroup)
             throws InterruptedException {
         ShardConsumerMetricsReporter shardMetricsReporter =
-                new ShardConsumerMetricsReporter(mock(MetricGroup.class));
+                new ShardConsumerMetricsReporter(metricGroup);
 
         StreamShardHandle fakeToBeConsumedShard = 
getMockStreamShard("fakeStream", 0);
 
@@ -116,7 +135,7 @@ public class ShardConsumerTestUtils {
                 recordPublisherFactory.create(
                         startingPosition,
                         fetcher.getConsumerConfiguration(),
-                        mock(MetricGroup.class),
+                        metricGroup,
                         shardHandle);
 
         int shardIndex =
@@ -139,6 +158,22 @@ public class ShardConsumerTestUtils {
         return shardMetricsReporter;
     }
 
+    public static ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
+            final int expectedNumberOfMessages,
+            final RecordPublisherFactory recordPublisherFactory,
+            final SequenceNumber startingSequenceNumber,
+            final Properties consumerProperties,
+            final SequenceNumber expectedLastProcessedSequenceNum)
+            throws InterruptedException {
+        return assertNumberOfMessagesReceivedFromKinesis(
+                expectedNumberOfMessages,
+                recordPublisherFactory,
+                startingSequenceNumber,
+                consumerProperties,
+                expectedLastProcessedSequenceNum,
+                createFakeShardConsumerMetricGroup());
+    }
+
     public static StreamShardHandle getMockStreamShard(String streamName, int 
shardId) {
         return new StreamShardHandle(
                 streamName,
@@ -155,4 +190,19 @@ public class ShardConsumerTestUtils {
     public static SequenceNumber fakeSequenceNumber() {
         return new SequenceNumber("fakeStartingState");
     }
+
+    public static AbstractMetricGroup createFakeShardConsumerMetricGroup(
+            OperatorMetricGroup metricGroup) {
+        return (AbstractMetricGroup)
+                metricGroup
+                        
.addGroup(KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, "fakeStream")
+                        .addGroup(
+                                
KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
+                                "shardId-000000000000");
+    }
+
+    public static AbstractMetricGroup createFakeShardConsumerMetricGroup() {
+        return createFakeShardConsumerMetricGroup(
+                
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
index 81cb207..0bf5688 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
 
-import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
@@ -28,6 +27,7 @@ import org.junit.Test;
 import java.util.Properties;
 
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -46,7 +46,7 @@ public class PollingRecordPublisherFactoryTest {
                         StartingPosition.restartFromSequenceNumber(
                                 SENTINEL_LATEST_SEQUENCE_NUM.get()),
                         new Properties(),
-                        mock(MetricGroup.class),
+                        createFakeShardConsumerMetricGroup(),
                         mock(StreamShardHandle.class));
 
         assertTrue(recordPublisher instanceof PollingRecordPublisher);
@@ -63,7 +63,7 @@ public class PollingRecordPublisherFactoryTest {
                         StartingPosition.restartFromSequenceNumber(
                                 SENTINEL_LATEST_SEQUENCE_NUM.get()),
                         properties,
-                        mock(MetricGroup.class),
+                        createFakeShardConsumerMetricGroup(),
                         mock(StreamShardHandle.class));
 
         assertTrue(recordPublisher instanceof PollingRecordPublisher);
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
index ad93f9c..2399ec8 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
 
-import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -29,6 +28,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
@@ -64,7 +64,9 @@ public class PollingRecordPublisherTest {
     @Test
     public void testRunEmitsRunLoopTimeNanos() throws Exception {
         PollingRecordPublisherMetricsReporter metricsReporter =
-                spy(new 
PollingRecordPublisherMetricsReporter(mock(MetricGroup.class)));
+                spy(
+                        new PollingRecordPublisherMetricsReporter(
+                                createFakeShardConsumerMetricGroup()));
 
         KinesisProxyInterface fakeKinesis = 
totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 5, 100);
         PollingRecordPublisher recordPublisher =
@@ -152,7 +154,7 @@ public class PollingRecordPublisherTest {
     PollingRecordPublisher createPollingRecordPublisher(final 
KinesisProxyInterface kinesis)
             throws Exception {
         PollingRecordPublisherMetricsReporter metricsReporter =
-                new 
PollingRecordPublisherMetricsReporter(mock(MetricGroup.class));
+                new 
PollingRecordPublisherMetricsReporter(createFakeShardConsumerMetricGroup());
 
         return createPollingRecordPublisher(kinesis, metricsReporter);
     }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
index a4f0f5b..cdfcd11 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.kinesis.metrics;
 
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -26,6 +28,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.verify;
@@ -67,4 +70,16 @@ public class ShardConsumerMetricsReporterTest {
         assertEquals(3, metricsReporter.getNumberOfAggregatedRecords());
         assertEquals(4, metricsReporter.getNumberOfDeaggregatedRecords());
     }
+
+    @Test
+    public void testUnregister() {
+        AbstractMetricGroup metricGroup =
+                ShardConsumerTestUtils.createFakeShardConsumerMetricGroup();
+        ShardConsumerMetricsReporter metricsReporter =
+                new ShardConsumerMetricsReporter(metricGroup);
+
+        metricsReporter.unregister();
+
+        assertTrue(metricGroup.isClosed());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
index 3cfae22..6d1bb8f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -19,7 +19,6 @@ package 
org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
@@ -46,6 +45,7 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
@@ -164,7 +164,7 @@ public class TestUtils {
                 .thenReturn(Thread.currentThread().getContextClassLoader());
 
         Mockito.when(mockedRuntimeContext.getMetricGroup())
-                .thenReturn(new UnregisteredMetricsGroup());
+                .thenReturn(createUnregisteredOperatorMetricGroup());
 
         return mockedRuntimeContext;
     }

Reply via email to