This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3887b1d [FLINK-24431][kinesis][efo] Stop consumer deregistration when
EAGER EFO configured. (#17417)
3887b1d is described below
commit 3887b1d1edd717385813b73a8127c93845f6b6c2
Author: Rudi Kershaw <[email protected]>
AuthorDate: Thu Oct 7 14:38:21 2021 +0100
[FLINK-24431][kinesis][efo] Stop consumer deregistration when EAGER EFO
configured. (#17417)
* FLINK-24431 Stop consumer deregistration when EAGER EFO configured.
* FLINK-24431 Verify criteria for stream deregistration in unit tests.
* FLINK-24431 Update documentation to reflect new EAGER EFO strategy
changes.
* FLINK-24431 Remove Java 11 language feature usage in consumer unit test.
* FLINK-24431 Untranslated zh documentation updated to match kinesis
documentation changes.
---
.../docs/connectors/datastream/kinesis.md | 5 +--
docs/content.zh/docs/connectors/table/kinesis.md | 4 +-
docs/content/docs/connectors/datastream/kinesis.md | 5 +--
docs/content/docs/connectors/table/kinesis.md | 4 +-
.../kinesis/util/StreamConsumerRegistrarUtil.java | 46 +++++++++++-----------
.../util/StreamConsumerRegistrarUtilTest.java | 30 +++++++++++---
6 files changed, 54 insertions(+), 40 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md
b/docs/content.zh/docs/connectors/datastream/kinesis.md
index 934d743..edfb28a 100644
--- a/docs/content.zh/docs/connectors/datastream/kinesis.md
+++ b/docs/content.zh/docs/connectors/datastream/kinesis.md
@@ -552,8 +552,7 @@ Retry and backoff parameters can be configured using the
this is called during stream consumer registration and deregistration. For
each stream this service will be invoked
periodically until the stream consumer is reported `ACTIVE`/`not found` for
registration/deregistration. By default,
the `LAZY` registration strategy will scale the number of calls by the job
parallelism. `EAGER` will call the service
-once per stream for registration, and scale the number of calls by the job
parallelism for deregistration.
-`NONE` will not invoke this service. Retry and backoff parameters can be
configured using the
+once per stream for registration only. `NONE` will not invoke this service.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` keys.
-
*[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*:
@@ -561,7 +560,7 @@ this is called once per stream during stream consumer
registration, unless the `
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.REGISTER_STREAM_*` keys.
-
*[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*:
-this is called once per stream during stream consumer deregistration, unless
the `NONE` registration strategy is configured.
+this is called once per stream during stream consumer deregistration, unless
the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.
## Kinesis Producer
diff --git a/docs/content.zh/docs/connectors/table/kinesis.md
b/docs/content.zh/docs/connectors/table/kinesis.md
index dfbc8d8..cc45b88 100644
--- a/docs/content.zh/docs/connectors/table/kinesis.md
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -728,10 +728,10 @@ You can enable and configure EFO with the following
properties:
However, consumer names do not have to be unique across data streams.
Reusing a consumer name will result in existing subscriptions being terminated.
-<span class="label label-info">Note</span> With the `LAZY` and `EAGER`
strategies, stream consumers are de-registered when the job is shutdown
gracefully.
+<span class="label label-info">Note</span> With the `LAZY` strategy, stream
consumers are de-registered when the job is shutdown gracefully.
In the event that a job terminates within executing the shutdown hooks, stream
consumers will remain active.
In this situation the stream consumers will be gracefully reused when the
application restarts.
-With the `NONE` strategy, stream consumer de-registration is not performed by
`FlinkKinesisConsumer`.
+With the `NONE` and `EAGER` strategies, stream consumer de-registration is not
performed by `FlinkKinesisConsumer`.
Data Type Mapping
----------------
diff --git a/docs/content/docs/connectors/datastream/kinesis.md
b/docs/content/docs/connectors/datastream/kinesis.md
index 1c2e309..87cba33 100644
--- a/docs/content/docs/connectors/datastream/kinesis.md
+++ b/docs/content/docs/connectors/datastream/kinesis.md
@@ -552,8 +552,7 @@ Retry and backoff parameters can be configured using the
this is called during stream consumer registration and deregistration. For
each stream this service will be invoked
periodically until the stream consumer is reported `ACTIVE`/`not found` for
registration/deregistration. By default,
the `LAZY` registration strategy will scale the number of calls by the job
parallelism. `EAGER` will call the service
-once per stream for registration, and scale the number of calls by the job
parallelism for deregistration.
-`NONE` will not invoke this service. Retry and backoff parameters can be
configured using the
+once per stream for registration only. `NONE` will not invoke this service.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` keys.
-
*[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*:
@@ -561,7 +560,7 @@ this is called once per stream during stream consumer
registration, unless the `
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.REGISTER_STREAM_*` keys.
-
*[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*:
-this is called once per stream during stream consumer deregistration, unless
the `NONE` registration strategy is configured.
+this is called once per stream during stream consumer deregistration, unless
the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.
## Kinesis Producer
diff --git a/docs/content/docs/connectors/table/kinesis.md
b/docs/content/docs/connectors/table/kinesis.md
index d88ea86..0b3679c 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -728,10 +728,10 @@ You can enable and configure EFO with the following
properties:
However, consumer names do not have to be unique across data streams.
Reusing a consumer name will result in existing subscriptions being terminated.
-<span class="label label-info">Note</span> With the `LAZY` and `EAGER`
strategies, stream consumers are de-registered when the job is shutdown
gracefully.
+<span class="label label-info">Note</span> With the `LAZY` strategy, stream
consumers are de-registered when the job is shutdown gracefully.
In the event that a job terminates within executing the shutdown hooks, stream
consumers will remain active.
In this situation the stream consumers will be gracefully reused when the
application restarts.
-With the `NONE` strategy, stream consumer de-registration is not performed by
`FlinkKinesisConsumer`.
+With the `NONE` and `EAGER` strategies, stream consumer de-registration is not
performed by `FlinkKinesisConsumer`.
Data Type Mapping
----------------
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
index ea7f263..44b6422 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
@@ -34,7 +34,6 @@ import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.efoConsumerArn;
import static
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isEagerEfoRegistrationType;
import static
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isLazyEfoRegistrationType;
-import static
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isNoneEfoRegistrationType;
import static
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isUsingEfoRecordPublisher;
/**
@@ -85,17 +84,18 @@ public class StreamConsumerRegistrarUtil {
*/
public static void deregisterStreamConsumers(
final Properties configProps, final List<String> streams) {
- if (!isUsingEfoRecordPublisher(configProps) ||
isNoneEfoRegistrationType(configProps)) {
- return;
+ if (isConsumerDeregistrationRequired(configProps)) {
+ StreamConsumerRegistrar registrar =
createStreamConsumerRegistrar(configProps, streams);
+ try {
+ deregisterStreamConsumers(registrar, configProps, streams);
+ } finally {
+ registrar.close();
+ }
}
+ }
- StreamConsumerRegistrar registrar =
createStreamConsumerRegistrar(configProps, streams);
-
- try {
- deregisterStreamConsumers(registrar, configProps, streams);
- } finally {
- registrar.close();
- }
+ private static boolean isConsumerDeregistrationRequired(final Properties
configProps) {
+ return isUsingEfoRecordPublisher(configProps) &&
isLazyEfoRegistrationType(configProps);
}
private static void registerStreamConsumers(
@@ -137,20 +137,18 @@ public class StreamConsumerRegistrarUtil {
final StreamConsumerRegistrar registrar,
final Properties configProps,
final List<String> streams) {
- if (!isUsingEfoRecordPublisher(configProps) ||
isNoneEfoRegistrationType(configProps)) {
- return;
- }
-
- for (String stream : streams) {
- try {
- registrar.deregisterStreamConsumer(stream);
- } catch (ExecutionException ex) {
- throw new FlinkKinesisStreamConsumerRegistrarException(
- "Error deregistering stream: " + stream, ex);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new FlinkKinesisStreamConsumerRegistrarException(
- "Error registering stream: " + stream, ex);
+ if (isConsumerDeregistrationRequired(configProps)) {
+ for (String stream : streams) {
+ try {
+ registrar.deregisterStreamConsumer(stream);
+ } catch (ExecutionException ex) {
+ throw new FlinkKinesisStreamConsumerRegistrarException(
+ "Error deregistering stream: " + stream, ex);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new FlinkKinesisStreamConsumerRegistrarException(
+ "Error registering stream: " + stream, ex);
+ }
}
}
}
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
index 7569b00..a3dd47d 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
@@ -25,13 +25,16 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.efoConsumerArn;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
/** Tests for {@link StreamConsumerRegistrar}. */
@@ -39,9 +42,7 @@ public class StreamConsumerRegistrarUtilTest {
@Test
public void testRegisterStreamConsumers() throws Exception {
- Properties configProps = new Properties();
- configProps.setProperty(EFO_CONSUMER_NAME, "consumer-name");
-
+ Properties configProps = getDefaultConfiguration();
StreamConsumerRegistrar registrar =
mock(StreamConsumerRegistrar.class);
when(registrar.registerStreamConsumer("stream-1", "consumer-name"))
.thenReturn("stream-1-consumer-arn");
@@ -57,10 +58,8 @@ public class StreamConsumerRegistrarUtilTest {
@Test
public void testDeregisterStreamConsumersMissingStreamArn() throws
Exception {
- Properties configProps = new Properties();
+ Properties configProps = getDefaultConfiguration();
configProps.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
- configProps.setProperty(EFO_CONSUMER_NAME, "consumer-name");
-
List<String> streams = Arrays.asList("stream-1", "stream-2");
StreamConsumerRegistrar registrar =
mock(StreamConsumerRegistrar.class);
@@ -69,4 +68,23 @@ public class StreamConsumerRegistrarUtilTest {
verify(registrar).deregisterStreamConsumer("stream-1");
verify(registrar).deregisterStreamConsumer("stream-2");
}
+
+ @Test
+ public void
testDeregisterStreamConsumersOnlyDeregistersEFOLazilyInitializedConsumers() {
+ Properties configProps = getDefaultConfiguration();
+ configProps.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+ configProps.put(EFO_REGISTRATION_TYPE, EAGER.name());
+ List<String> streams = Arrays.asList("stream-1");
+ StreamConsumerRegistrar registrar =
mock(StreamConsumerRegistrar.class);
+
+ StreamConsumerRegistrarUtil.deregisterStreamConsumers(registrar,
configProps, streams);
+
+ verifyZeroInteractions(registrar);
+ }
+
+ private Properties getDefaultConfiguration() {
+ Properties configProps = new Properties();
+ configProps.setProperty(EFO_CONSUMER_NAME, "consumer-name");
+ return configProps;
+ }
}