This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3887b1d  [FLINK-24431][kinesis][efo] Stop consumer deregistration when 
EAGER EFO configured. (#17417)
3887b1d is described below

commit 3887b1d1edd717385813b73a8127c93845f6b6c2
Author: Rudi Kershaw <[email protected]>
AuthorDate: Thu Oct 7 14:38:21 2021 +0100

    [FLINK-24431][kinesis][efo] Stop consumer deregistration when EAGER EFO 
configured. (#17417)
    
    * FLINK-24431 Stop consumer deregistration when EAGER EFO configured.
    
    * FLINK-24431 Verify criteria for stream deregistration in unit tests.
    
    * FLINK-24431 Update documentation to reflect new EAGER EFO strategy 
changes.
    
    * FLINK-24431 Remove Java 11 language feature usage in consumer unit test.
    
    * FLINK-24431 Untranslated zh documentation updated to match kinesis 
documentation changes.
---
 .../docs/connectors/datastream/kinesis.md          |  5 +--
 docs/content.zh/docs/connectors/table/kinesis.md   |  4 +-
 docs/content/docs/connectors/datastream/kinesis.md |  5 +--
 docs/content/docs/connectors/table/kinesis.md      |  4 +-
 .../kinesis/util/StreamConsumerRegistrarUtil.java  | 46 +++++++++++-----------
 .../util/StreamConsumerRegistrarUtilTest.java      | 30 +++++++++++---
 6 files changed, 54 insertions(+), 40 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md 
b/docs/content.zh/docs/connectors/datastream/kinesis.md
index 934d743..edfb28a 100644
--- a/docs/content.zh/docs/connectors/datastream/kinesis.md
+++ b/docs/content.zh/docs/connectors/datastream/kinesis.md
@@ -552,8 +552,7 @@ Retry and backoff parameters can be configured using the
 this is called during stream consumer registration and deregistration. For 
each stream this service will be invoked 
 periodically until the stream consumer is reported `ACTIVE`/`not found` for 
registration/deregistration. By default,
 the `LAZY` registration strategy will scale the number of calls by the job 
parallelism. `EAGER` will call the service 
-once per stream for registration, and scale the number of calls by the job 
parallelism for deregistration. 
-`NONE` will not invoke this service. Retry and backoff parameters can be 
configured using the 
+once per stream for registration only. `NONE` will not invoke this service. 
Retry and backoff parameters can be configured using the 
 `ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` keys.  
 
 - 
*[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*:
 
@@ -561,7 +560,7 @@ this is called once per stream during stream consumer 
registration, unless the `
 Retry and backoff parameters can be configured using the 
`ConsumerConfigConstants.REGISTER_STREAM_*` keys.
 
 - 
*[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*:
 
-this is called once per stream during stream consumer deregistration, unless 
the `NONE` registration strategy is configured.
+this is called once per stream during stream consumer deregistration, unless 
the `NONE` or `EAGER` registration strategy is configured.
 Retry and backoff parameters can be configured using the 
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.  
 
 ## Kinesis Producer
diff --git a/docs/content.zh/docs/connectors/table/kinesis.md 
b/docs/content.zh/docs/connectors/table/kinesis.md
index dfbc8d8..cc45b88 100644
--- a/docs/content.zh/docs/connectors/table/kinesis.md
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -728,10 +728,10 @@ You can enable and configure EFO with the following 
properties:
 However, consumer names do not have to be unique across data streams.
 Reusing a consumer name will result in existing subscriptions being terminated.
 
-<span class="label label-info">Note</span> With the `LAZY` and `EAGER` 
strategies, stream consumers are de-registered when the job is shutdown 
gracefully.
+<span class="label label-info">Note</span> With the `LAZY` strategy, stream 
consumers are de-registered when the job is shutdown gracefully.
 In the event that a job terminates within executing the shutdown hooks, stream 
consumers will remain active.
 In this situation the stream consumers will be gracefully reused when the 
application restarts.
-With the `NONE` strategy, stream consumer de-registration is not performed by 
`FlinkKinesisConsumer`.
+With the `NONE` and `EAGER` strategies, stream consumer de-registration is not 
performed by `FlinkKinesisConsumer`.
 
 Data Type Mapping
 ----------------
diff --git a/docs/content/docs/connectors/datastream/kinesis.md 
b/docs/content/docs/connectors/datastream/kinesis.md
index 1c2e309..87cba33 100644
--- a/docs/content/docs/connectors/datastream/kinesis.md
+++ b/docs/content/docs/connectors/datastream/kinesis.md
@@ -552,8 +552,7 @@ Retry and backoff parameters can be configured using the
 this is called during stream consumer registration and deregistration. For 
each stream this service will be invoked 
 periodically until the stream consumer is reported `ACTIVE`/`not found` for 
registration/deregistration. By default,
 the `LAZY` registration strategy will scale the number of calls by the job 
parallelism. `EAGER` will call the service 
-once per stream for registration, and scale the number of calls by the job 
parallelism for deregistration. 
-`NONE` will not invoke this service. Retry and backoff parameters can be 
configured using the 
+once per stream for registration only. `NONE` will not invoke this service. 
Retry and backoff parameters can be configured using the 
 `ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` keys.  
 
 - 
*[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*:
 
@@ -561,7 +560,7 @@ this is called once per stream during stream consumer 
registration, unless the `
 Retry and backoff parameters can be configured using the 
`ConsumerConfigConstants.REGISTER_STREAM_*` keys.
 
 - 
*[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*:
 
-this is called once per stream during stream consumer deregistration, unless 
the `NONE` registration strategy is configured.
+this is called once per stream during stream consumer deregistration, unless 
the `NONE` or `EAGER` registration strategy is configured.
 Retry and backoff parameters can be configured using the 
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.  
 
 ## Kinesis Producer
diff --git a/docs/content/docs/connectors/table/kinesis.md 
b/docs/content/docs/connectors/table/kinesis.md
index d88ea86..0b3679c 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -728,10 +728,10 @@ You can enable and configure EFO with the following 
properties:
 However, consumer names do not have to be unique across data streams.
 Reusing a consumer name will result in existing subscriptions being terminated.
 
-<span class="label label-info">Note</span> With the `LAZY` and `EAGER` 
strategies, stream consumers are de-registered when the job is shutdown 
gracefully.
+<span class="label label-info">Note</span> With the `LAZY` strategy, stream 
consumers are de-registered when the job is shutdown gracefully.
 In the event that a job terminates within executing the shutdown hooks, stream 
consumers will remain active.
 In this situation the stream consumers will be gracefully reused when the 
application restarts.
-With the `NONE` strategy, stream consumer de-registration is not performed by 
`FlinkKinesisConsumer`.
+With the `NONE` and `EAGER` strategies, stream consumer de-registration is not 
performed by `FlinkKinesisConsumer`.
 
 Data Type Mapping
 ----------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
index ea7f263..44b6422 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtil.java
@@ -34,7 +34,6 @@ import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.efoConsumerArn;
 import static 
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isEagerEfoRegistrationType;
 import static 
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isLazyEfoRegistrationType;
-import static 
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isNoneEfoRegistrationType;
 import static 
org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util.isUsingEfoRecordPublisher;
 
 /**
@@ -85,17 +84,18 @@ public class StreamConsumerRegistrarUtil {
      */
     public static void deregisterStreamConsumers(
             final Properties configProps, final List<String> streams) {
-        if (!isUsingEfoRecordPublisher(configProps) || 
isNoneEfoRegistrationType(configProps)) {
-            return;
+        if (isConsumerDeregistrationRequired(configProps)) {
+            StreamConsumerRegistrar registrar = 
createStreamConsumerRegistrar(configProps, streams);
+            try {
+                deregisterStreamConsumers(registrar, configProps, streams);
+            } finally {
+                registrar.close();
+            }
         }
+    }
 
-        StreamConsumerRegistrar registrar = 
createStreamConsumerRegistrar(configProps, streams);
-
-        try {
-            deregisterStreamConsumers(registrar, configProps, streams);
-        } finally {
-            registrar.close();
-        }
+    private static boolean isConsumerDeregistrationRequired(final Properties 
configProps) {
+        return isUsingEfoRecordPublisher(configProps) && 
isLazyEfoRegistrationType(configProps);
     }
 
     private static void registerStreamConsumers(
@@ -137,20 +137,18 @@ public class StreamConsumerRegistrarUtil {
             final StreamConsumerRegistrar registrar,
             final Properties configProps,
             final List<String> streams) {
-        if (!isUsingEfoRecordPublisher(configProps) || 
isNoneEfoRegistrationType(configProps)) {
-            return;
-        }
-
-        for (String stream : streams) {
-            try {
-                registrar.deregisterStreamConsumer(stream);
-            } catch (ExecutionException ex) {
-                throw new FlinkKinesisStreamConsumerRegistrarException(
-                        "Error deregistering stream: " + stream, ex);
-            } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
-                throw new FlinkKinesisStreamConsumerRegistrarException(
-                        "Error registering stream: " + stream, ex);
+        if (isConsumerDeregistrationRequired(configProps)) {
+            for (String stream : streams) {
+                try {
+                    registrar.deregisterStreamConsumer(stream);
+                } catch (ExecutionException ex) {
+                    throw new FlinkKinesisStreamConsumerRegistrarException(
+                            "Error deregistering stream: " + stream, ex);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new FlinkKinesisStreamConsumerRegistrarException(
+                            "Error registering stream: " + stream, ex);
+                }
             }
         }
     }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
index 7569b00..a3dd47d 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/StreamConsumerRegistrarUtilTest.java
@@ -25,13 +25,16 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.efoConsumerArn;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 /** Tests for {@link StreamConsumerRegistrar}. */
@@ -39,9 +42,7 @@ public class StreamConsumerRegistrarUtilTest {
 
     @Test
     public void testRegisterStreamConsumers() throws Exception {
-        Properties configProps = new Properties();
-        configProps.setProperty(EFO_CONSUMER_NAME, "consumer-name");
-
+        Properties configProps = getDefaultConfiguration();
         StreamConsumerRegistrar registrar = 
mock(StreamConsumerRegistrar.class);
         when(registrar.registerStreamConsumer("stream-1", "consumer-name"))
                 .thenReturn("stream-1-consumer-arn");
@@ -57,10 +58,8 @@ public class StreamConsumerRegistrarUtilTest {
 
     @Test
     public void testDeregisterStreamConsumersMissingStreamArn() throws 
Exception {
-        Properties configProps = new Properties();
+        Properties configProps = getDefaultConfiguration();
         configProps.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
-        configProps.setProperty(EFO_CONSUMER_NAME, "consumer-name");
-
         List<String> streams = Arrays.asList("stream-1", "stream-2");
         StreamConsumerRegistrar registrar = 
mock(StreamConsumerRegistrar.class);
 
@@ -69,4 +68,23 @@ public class StreamConsumerRegistrarUtilTest {
         verify(registrar).deregisterStreamConsumer("stream-1");
         verify(registrar).deregisterStreamConsumer("stream-2");
     }
+
+    @Test
+    public void 
testDeregisterStreamConsumersOnlyDeregistersEFOLazilyInitializedConsumers() {
+        Properties configProps = getDefaultConfiguration();
+        configProps.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+        configProps.put(EFO_REGISTRATION_TYPE, EAGER.name());
+        List<String> streams = Arrays.asList("stream-1");
+        StreamConsumerRegistrar registrar = 
mock(StreamConsumerRegistrar.class);
+
+        StreamConsumerRegistrarUtil.deregisterStreamConsumers(registrar, 
configProps, streams);
+
+        verifyZeroInteractions(registrar);
+    }
+
+    private Properties getDefaultConfiguration() {
+        Properties configProps = new Properties();
+        configProps.setProperty(EFO_CONSUMER_NAME, "consumer-name");
+        return configProps;
+    }
 }

Reply via email to