This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 2ec095a [FLINK-23705][connectors/kinesis] Unregistering metric
reporting for closed shards
2ec095a is described below
commit 2ec095a67bccda8033b4ce3e433f0a657706ae84
Author: Elphas Toringepi <[email protected]>
AuthorDate: Thu Aug 12 17:46:29 2021 +0100
[FLINK-23705][connectors/kinesis] Unregistering metric reporting for closed
shards
---
.../kinesis/internals/ShardConsumer.java | 2 +
.../metrics/ShardConsumerMetricsReporter.java | 10 ++++
.../internals/DynamoDBStreamsDataFetcherTest.java | 5 +-
.../kinesis/internals/ShardConsumerTest.java | 28 ++++++++++
.../kinesis/internals/ShardConsumerTestUtils.java | 64 +++++++++++++++++++---
.../polling/PollingRecordPublisherFactoryTest.java | 6 +-
.../polling/PollingRecordPublisherTest.java | 8 ++-
.../metrics/ShardConsumerMetricsReporterTest.java | 15 +++++
.../connectors/kinesis/testutils/TestUtils.java | 4 +-
9 files changed, 126 insertions(+), 16 deletions(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 31af3f1..85dc8f4 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -158,6 +158,8 @@ public class ShardConsumer<T> implements Runnable {
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
+ } finally {
+ this.shardConsumerMetricsReporter.unregister();
}
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
index 777cc61..d8fdf99 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
@@ -20,18 +20,22 @@ package
org.apache.flink.streaming.connectors.kinesis.metrics;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
/** A container for {@link ShardConsumer}s to report metric values. */
@Internal
public class ShardConsumerMetricsReporter {
+ private final MetricGroup metricGroup;
+
private volatile long millisBehindLatest = -1;
private volatile long averageRecordSizeBytes = 0L;
private volatile int numberOfAggregatedRecords = 0;
private volatile int numberOfDeaggregatedRecords = 0;
public ShardConsumerMetricsReporter(final MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
metricGroup.gauge(
KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE,
this::getMillisBehindLatest);
@@ -77,4 +81,10 @@ public class ShardConsumerMetricsReporter {
public void setNumberOfDeaggregatedRecords(int
numberOfDeaggregatedRecords) {
this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
}
+
+ public void unregister() {
+ if (this.metricGroup instanceof AbstractMetricGroup) {
+ ((AbstractMetricGroup) this.metricGroup).close();
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
index 91b1abc..b847b30 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java
@@ -19,6 +19,7 @@ package
org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
@@ -32,6 +33,7 @@ import java.util.Properties;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
import static java.util.Collections.singletonList;
import static
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
+import static
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
import static
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -60,7 +62,8 @@ public class DynamoDBStreamsDataFetcherTest {
fetcher.createRecordPublisher(
SENTINEL_LATEST_SEQUENCE_NUM.get(),
new Properties(),
- runtimeContext.getMetricGroup(),
+ createFakeShardConsumerMetricGroup(
+ (OperatorMetricGroup) runtimeContext.getMetricGroup()),
dummyStreamShardHandle);
verify(kinesis).getShardIterator(dummyStreamShardHandle,
LATEST.toString(), null);
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 7844dc3..22b5d03 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import
org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
@@ -33,10 +34,12 @@ import java.util.Properties;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
import static
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
import static
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
import static
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
@@ -59,6 +62,17 @@ public class ShardConsumerTest {
}
@Test
+ public void
testConsumerAndProducerMetricsAreUnregisteredAfterShardCompletes()
+ throws Exception {
+ KinesisProxyInterface kinesis =
+
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(
+ 500, 5, 500);
+ AbstractMetricGroup metricGroup = createFakeShardConsumerMetricGroup();
+ assertNumberOfMessagesReceivedFromKinesis(500, kinesis,
fakeSequenceNumber(), metricGroup);
+ assertTrue(metricGroup.isClosed());
+ }
+
+ @Test
public void testTimestampStartingPositionWithEmptyShard() throws Exception
{
Properties consumerProperties = new Properties();
consumerProperties.setProperty(
@@ -207,6 +221,20 @@ public class ShardConsumerTest {
final int expectedNumberOfMessages,
final KinesisProxyInterface kinesis,
final SequenceNumber startingSequenceNumber,
+ final AbstractMetricGroup metricGroup)
+ throws Exception {
+ return
ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+ expectedNumberOfMessages,
+ new PollingRecordPublisherFactory(config -> kinesis),
+ startingSequenceNumber,
+ new Properties(),
+ metricGroup);
+ }
+
+ private ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final KinesisProxyInterface kinesis,
+ final SequenceNumber startingSequenceNumber,
final Properties consumerProperties)
throws Exception {
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
index c34b60e..1c746a6 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
@@ -18,9 +18,12 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import
org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
import
org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -47,12 +50,11 @@ import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
/** Tests for the {@link ShardConsumer}. */
public class ShardConsumerTestUtils {
- public static <T> ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
+ public static ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
final int expectedNumberOfMessages,
final RecordPublisherFactory recordPublisherFactory,
final SequenceNumber startingSequenceNumber,
@@ -66,15 +68,32 @@ public class ShardConsumerTestUtils {
SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
}
- public static <T> ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
+ public static ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
final int expectedNumberOfMessages,
final RecordPublisherFactory recordPublisherFactory,
final SequenceNumber startingSequenceNumber,
final Properties consumerProperties,
- final SequenceNumber expectedLastProcessedSequenceNum)
+ final AbstractMetricGroup metricGroup)
+ throws InterruptedException {
+ return assertNumberOfMessagesReceivedFromKinesis(
+ expectedNumberOfMessages,
+ recordPublisherFactory,
+ startingSequenceNumber,
+ consumerProperties,
+ SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+ metricGroup);
+ }
+
+ public static ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final RecordPublisherFactory recordPublisherFactory,
+ final SequenceNumber startingSequenceNumber,
+ final Properties consumerProperties,
+ final SequenceNumber expectedLastProcessedSequenceNum,
+ final AbstractMetricGroup metricGroup)
throws InterruptedException {
ShardConsumerMetricsReporter shardMetricsReporter =
- new ShardConsumerMetricsReporter(mock(MetricGroup.class));
+ new ShardConsumerMetricsReporter(metricGroup);
StreamShardHandle fakeToBeConsumedShard =
getMockStreamShard("fakeStream", 0);
@@ -116,7 +135,7 @@ public class ShardConsumerTestUtils {
recordPublisherFactory.create(
startingPosition,
fetcher.getConsumerConfiguration(),
- mock(MetricGroup.class),
+ metricGroup,
shardHandle);
int shardIndex =
@@ -139,6 +158,22 @@ public class ShardConsumerTestUtils {
return shardMetricsReporter;
}
+ public static ShardConsumerMetricsReporter
assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final RecordPublisherFactory recordPublisherFactory,
+ final SequenceNumber startingSequenceNumber,
+ final Properties consumerProperties,
+ final SequenceNumber expectedLastProcessedSequenceNum)
+ throws InterruptedException {
+ return assertNumberOfMessagesReceivedFromKinesis(
+ expectedNumberOfMessages,
+ recordPublisherFactory,
+ startingSequenceNumber,
+ consumerProperties,
+ expectedLastProcessedSequenceNum,
+ createFakeShardConsumerMetricGroup());
+ }
+
public static StreamShardHandle getMockStreamShard(String streamName, int
shardId) {
return new StreamShardHandle(
streamName,
@@ -155,4 +190,19 @@ public class ShardConsumerTestUtils {
public static SequenceNumber fakeSequenceNumber() {
return new SequenceNumber("fakeStartingState");
}
+
+ public static AbstractMetricGroup createFakeShardConsumerMetricGroup(
+ OperatorMetricGroup metricGroup) {
+ return (AbstractMetricGroup)
+ metricGroup
+
.addGroup(KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, "fakeStream")
+ .addGroup(
+
KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
+ "shardId-000000000000");
+ }
+
+ public static AbstractMetricGroup createFakeShardConsumerMetricGroup() {
+ return createFakeShardConsumerMetricGroup(
+
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
+ }
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
index 81cb207..0bf5688 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
@@ -17,7 +17,6 @@
package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
-import org.apache.flink.metrics.MetricGroup;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
@@ -28,6 +27,7 @@ import org.junit.Test;
import java.util.Properties;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
+import static
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
import static
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -46,7 +46,7 @@ public class PollingRecordPublisherFactoryTest {
StartingPosition.restartFromSequenceNumber(
SENTINEL_LATEST_SEQUENCE_NUM.get()),
new Properties(),
- mock(MetricGroup.class),
+ createFakeShardConsumerMetricGroup(),
mock(StreamShardHandle.class));
assertTrue(recordPublisher instanceof PollingRecordPublisher);
@@ -63,7 +63,7 @@ public class PollingRecordPublisherFactoryTest {
StartingPosition.restartFromSequenceNumber(
SENTINEL_LATEST_SEQUENCE_NUM.get()),
properties,
- mock(MetricGroup.class),
+ createFakeShardConsumerMetricGroup(),
mock(StreamShardHandle.class));
assertTrue(recordPublisher instanceof PollingRecordPublisher);
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
index ad93f9c..2399ec8 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -17,7 +17,6 @@
package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
-import org.apache.flink.metrics.MetricGroup;
import
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -29,6 +28,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import static
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.createFakeShardConsumerMetricGroup;
import static
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
import static
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
import static
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
@@ -64,7 +64,9 @@ public class PollingRecordPublisherTest {
@Test
public void testRunEmitsRunLoopTimeNanos() throws Exception {
PollingRecordPublisherMetricsReporter metricsReporter =
- spy(new
PollingRecordPublisherMetricsReporter(mock(MetricGroup.class)));
+ spy(
+ new PollingRecordPublisherMetricsReporter(
+ createFakeShardConsumerMetricGroup()));
KinesisProxyInterface fakeKinesis =
totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 5, 100);
PollingRecordPublisher recordPublisher =
@@ -152,7 +154,7 @@ public class PollingRecordPublisherTest {
PollingRecordPublisher createPollingRecordPublisher(final
KinesisProxyInterface kinesis)
throws Exception {
PollingRecordPublisherMetricsReporter metricsReporter =
- new
PollingRecordPublisherMetricsReporter(mock(MetricGroup.class));
+ new
PollingRecordPublisherMetricsReporter(createFakeShardConsumerMetricGroup());
return createPollingRecordPublisher(kinesis, metricsReporter);
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
index a4f0f5b..cdfcd11 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.kinesis.metrics;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -26,6 +28,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
@@ -67,4 +70,16 @@ public class ShardConsumerMetricsReporterTest {
assertEquals(3, metricsReporter.getNumberOfAggregatedRecords());
assertEquals(4, metricsReporter.getNumberOfDeaggregatedRecords());
}
+
+ @Test
+ public void testUnregister() {
+ AbstractMetricGroup metricGroup =
+ ShardConsumerTestUtils.createFakeShardConsumerMetricGroup();
+ ShardConsumerMetricsReporter metricsReporter =
+ new ShardConsumerMetricsReporter(metricGroup);
+
+ metricsReporter.unregister();
+
+ assertTrue(metricGroup.isClosed());
+ }
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
index 3cfae22..6d1bb8f 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -19,7 +19,6 @@ package
org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
@@ -46,6 +45,7 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
@@ -164,7 +164,7 @@ public class TestUtils {
.thenReturn(Thread.currentThread().getContextClassLoader());
Mockito.when(mockedRuntimeContext.getMetricGroup())
- .thenReturn(new UnregisteredMetricsGroup());
+ .thenReturn(createUnregisteredOperatorMetricGroup());
return mockedRuntimeContext;
}