dannycranmer commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r456415363



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;

Review comment:
       @xiaolong-sn missing Apache copyright statement

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;
+
+       private int subscribeToShardMaxRetries;
+
+       private long subscribeToShardMaxBackoffMillis;
+
+       private long subscribeToShardBaseBackoffMillis;
+
+       private double subscribeToShardExpConstant;
+
+       public FanOutProperties(Properties properties, List<String> streams) {

Review comment:
       @xiaolong-sn missing comment ["All public/protected methods and classes 
must have a 
Javadoc."](https://flink.apache.org/contributing/code-style-and-quality-formatting.html)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void 
validateConsumerConfiguration(Properties config) {
                }
        }
 
+       public static void validateEFOConfiguration(Properties config, 
List<String> streams) {
+               if 
(config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+                       String recordPublisherType = 
config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+                       // specified record publisher type in stream must be 
either EFO or POLLING
+                       try {
+                               
RecordPublisherType.valueOf(recordPublisherType);
+                       } catch (IllegalArgumentException e) {
+                               StringBuilder sb = new StringBuilder();
+                               for (RecordPublisherType rp : 
RecordPublisherType.values()) {
+                                       sb.append(rp.toString()).append(", ");
+                               }
+                               throw new IllegalArgumentException("Invalid 
record publisher type in stream set in config. Valid values are: " + 
sb.toString());

Review comment:
       @xiaolong-sn this validation is general stream consumer validation yet 
is within the `validateEFOConfiguration` method. Suggest moving to a new method 
and invoke from the parent to restrict responsibility. 

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;

Review comment:
       @xiaolong-sn Missing Apache copyright 

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void 
validateConsumerConfiguration(Properties config) {
                }
        }
 
+       public static void validateEFOConfiguration(Properties config, 
List<String> streams) {
+               if 
(config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+                       String recordPublisherType = 
config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+                       // specified record publisher type in stream must be 
either EFO or POLLING
+                       try {
+                               
RecordPublisherType.valueOf(recordPublisherType);
+                       } catch (IllegalArgumentException e) {
+                               StringBuilder sb = new StringBuilder();
+                               for (RecordPublisherType rp : 
RecordPublisherType.values()) {
+                                       sb.append(rp.toString()).append(", ");
+                               }
+                               throw new IllegalArgumentException("Invalid 
record publisher type in stream set in config. Valid values are: " + 
sb.toString());
+                       }
+                       if (RecordPublisherType.valueOf(recordPublisherType) == 
RecordPublisherType.EFO) {
+                               String efoRegistrationType;
+                               if 
(config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+                                       efoRegistrationType = 
config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+                                       // specified efo registration type in 
stream must be either LAZY, EAGER or NONE.
+                                       try {
+                                               
EFORegistrationType.valueOf(efoRegistrationType);
+                                       } catch (IllegalArgumentException e) {
+                                               StringBuilder sb = new 
StringBuilder();
+                                               for (EFORegistrationType ert : 
EFORegistrationType.values()) {
+                                                       
sb.append(ert.toString()).append(", ");
+                                               }
+                                               throw new 
IllegalArgumentException("Invalid efo registration type in stream set in 
config. Valid values are: " + sb.toString());
+                                       }
+                               } else {
+                                       efoRegistrationType = 
EFORegistrationType.LAZY.toString();
+                               }
+                               if 
(EFORegistrationType.valueOf(efoRegistrationType) == EFORegistrationType.NONE) {
+                                       //if the registration type is NONE, 
then for each stream there must be an according consumer ARN
+                                       for (String stream : streams) {
+                                               String efoConsumerARNKey = 
ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+                                               if 
(!config.containsKey(efoConsumerARNKey)) {
+                                                       throw new 
IllegalArgumentException("Invalid efo consumer arn settings for not providing " 
+ efoConsumerARNKey);

Review comment:
       @xiaolong-sn nit: This is fine, however consider collecting all missing 
keys rather than fail fast to give the user a single error message

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;
+
+       private int subscribeToShardMaxRetries;
+
+       private long subscribeToShardMaxBackoffMillis;
+
+       private long subscribeToShardBaseBackoffMillis;
+
+       private double subscribeToShardExpConstant;
+
+       public FanOutProperties(Properties properties, List<String> streams) {
+               //validate the properties
+               
Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()),
 "Only efo record publisher can register a FanOutProperties.");
+               KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+               efoRegistrationType = 
EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE,
 EFORegistrationType.EAGER.toString()));
+               //if efo registration type is EAGER|LAZY, then user should 
explicitly provide a consumer name for each stream.
+               if (efoRegistrationType == EFORegistrationType.EAGER || 
efoRegistrationType == EFORegistrationType.LAZY) {
+                       consumerName = 
properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+               } else {
+                       //else users should explicitly provide consumer arns.
+                       streamConsumerArns = new ArrayList<>();
+                       for (String stream:streams) {

Review comment:
       @xiaolong-sn nit: spaces around `:`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -180,6 +182,104 @@ public void 
testUnrecognizableCredentialProviderTypeInConfig() {
                KinesisConfigUtil.validateAwsConfiguration(testConfig);
        }
 
+       // 
----------------------------------------------------------------------
+       // validateEFOConfiguration() tests
+       // 
----------------------------------------------------------------------
+
+       @Test
+       public void testNoRecordPublisherTypeInConfig() {
+               Properties testConfig = TestUtils.getStandardProperties();
+
+               KinesisConfigUtil.validateEFOConfiguration(testConfig, new 
ArrayList<>());

Review comment:
       @xiaolong-sn nit: `Collections.emptyList()` is more expressive

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {

Review comment:
       @xiaolong-sn does this need to be `Serializable`? I think the 
`Properties` are serialised which are used to construct this class, and 
therefore this class does not need it.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FanOutProperties}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FanOutProperties.class)
+public class FanOutPropertiesTest extends TestLogger {
+       @Rule
+       private ExpectedException exception = ExpectedException.none();
+
+       @Test
+       public void testPollingRecordPublisher() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Only efo record publisher can register 
a FanOutProperties.");
+
+               Properties testConfig = TestUtils.getStandardProperties();
+               
testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
RecordPublisherType.POLLING.toString());
+
+               new FanOutProperties(testConfig, new ArrayList<>());
+       }
+
+       @Test
+       public void testEagerStrategyWithConsumerName() {
+               String fakedConsumerName = "fakedconsumername";
+               Properties testConfig = TestUtils.getStandardProperties();
+               
testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
RecordPublisherType.EFO.toString());
+               
testConfig.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
fakedConsumerName);
+               FanOutProperties fanOutProperties = new 
FanOutProperties(testConfig, new ArrayList<>());
+               assertEquals(fanOutProperties.getConsumerName(), 
fakedConsumerName);
+       }
+
+       @Test
+       public void testEagerStrategyWithNoConsumerName() {
+               String msg = "No valid enhanced fan-out consumer name is set 
through " + ConsumerConfigConstants.EFO_CONSUMER_NAME;
+
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage(msg);
+
+               Properties testConfig = TestUtils.getStandardProperties();
+               
testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
RecordPublisherType.EFO.toString());
+               new FanOutProperties(testConfig, new ArrayList<>());
+       }
+
+       @Test
+       public void testNoneStrategyWithStreams() {
+               String fakedStream1 = "fakedstream1";

Review comment:
       @xiaolong-sn these 5 lines can be reduced to `List<String> streams = 
Arrays.asList("fakedstream1", "fakedstream2");`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
        /* Exponential backoff power constant for the describe stream 
operation. */
        private final double describeStreamExpConstant;
 
+       // 
------------------------------------------------------------------------
+       //  registerStream() related performance settings
+       // 
------------------------------------------------------------------------
+       /** Base backoff millis for the register stream operation. */
+       private final long registerStreamBaseBackoffMillis;
+
+       /** Maximum backoff millis for the register stream operation. */
+       private final long registerStreamMaxBackoffMillis;
+
+       /** Exponential backoff power constant for the register stream 
operation. */
+       private final double registerStreamExpConstant;
+
+       /** Maximum retry attempts for the register stream operation. */
+       private final int registerStreamMaxRetries;
+       // 
------------------------------------------------------------------------
+       //  deregisterStream() related performance settings
+       // 
------------------------------------------------------------------------
+       /** Base backoff millis for the deregister stream operation. */
+       private final long deregisterStreamBaseBackoffMillis;
+
+       /** Maximum backoff millis for the deregister stream operation. */
+       private final long deregisterStreamMaxBackoffMillis;
+
+       /** Exponential backoff power constant for the deregister stream 
operation. */
+       private final double deregisterStreamExpConstant;
+
+       /** Maximum retry attempts for the deregister stream operation. */
+       private final int deregisterStreamMaxRetries;
+       // 
------------------------------------------------------------------------
+       //  listStream() related performance settings

Review comment:
       @xiaolong-sn these should be `listStreamConsumers`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
        /* Exponential backoff power constant for the describe stream 
operation. */
        private final double describeStreamExpConstant;
 
+       // 
------------------------------------------------------------------------

Review comment:
       @xiaolong-sn as discussed the plan was to implement the de-/registration 
in the `KinesisProxyV2`. Can we please move these properties to a new 
configuration class, similar to the `FanOutProperties`, or merge them all into 
a single configuration class?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -159,6 +174,54 @@ public static void 
validateConsumerConfiguration(Properties config) {
                validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
                        "Invalid value given for list shards operation backoff 
exponential constant. Must be a valid non-negative double value.");
 
+               validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+                       "Invalid value given for maximum retry attempts for 
register stream operation. Must be a valid non-negative integer value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+                       "Invalid value given for register stream operation base 
backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+                       "Invalid value given for register stream operation max 
backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+                       "Invalid value given for register stream operation 
backoff exponential constant. Must be a valid non-negative double value.");
+
+               validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+                       "Invalid value given for maximum retry attempts for 
deregister stream operation. Must be a valid non-negative integer value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+                       "Invalid value given for deregister stream operation 
base backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+                       "Invalid value given for deregister stream operation 
max backoff milliseconds. Must be a valid non-negative long value.");
+
+               validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+                       "Invalid value given for deregister stream operation 
backoff exponential constant. Must be a valid non-negative double value.");
+
+               validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.LIST_STREAM_RETRIES,

Review comment:
       @xiaolong-sn should be list stream consumers (for all the new list 
properties)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;
+
+       private int subscribeToShardMaxRetries;
+
+       private long subscribeToShardMaxBackoffMillis;
+
+       private long subscribeToShardBaseBackoffMillis;
+
+       private double subscribeToShardExpConstant;
+
+       public FanOutProperties(Properties properties, List<String> streams) {
+               //validate the properties
+               
Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()),
 "Only efo record publisher can register a FanOutProperties.");
+               KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+               efoRegistrationType = 
EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE,
 EFORegistrationType.EAGER.toString()));
+               //if efo registration type is EAGER|LAZY, then user should 
explicitly provide a consumer name for each stream.
+               if (efoRegistrationType == EFORegistrationType.EAGER || 
efoRegistrationType == EFORegistrationType.LAZY) {
+                       consumerName = 
properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+               } else {
+                       //else users should explicitly provide consumer arns.
+                       streamConsumerArns = new ArrayList<>();
+                       for (String stream:streams) {
+                               String key = 
ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+                               
streamConsumerArns.add(properties.getProperty(key));
+                       }
+               }
+
+               this.subscribeToShardMaxRetries = Integer.parseInt(
+                       properties.getProperty(
+                               
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+               this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+                       
properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+               this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+                       
properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+               this.subscribeToShardExpConstant = Double.parseDouble(
+                       
properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+                               
Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+       }
+
+       public void setEfoRegistrationType(EFORegistrationType 
efoRegistrationType) {

Review comment:
       @xiaolong-sn as the setters are not needed, we should delete them and 
make the class immutable (imo). Then we can make the fields `final` (["A good 
general approach is to try and make as many fields of a class final as 
possible"](https://flink.apache.org/contributing/code-style-and-quality-common.html))

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable

Review comment:
       @xiaolong-sn nit: My preference is a new line before each field (before 
the annotations)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;

Review comment:
       @xiaolong-sn having a `List` means we lose the Stream > ARN 
relationship. Can you use a `Map` instead where the key is stream and value is 
ARN?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -277,7 +377,7 @@ public void testIllegalValueForInitialTimestampInConfig() {
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"-1.0");
 
-               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig, new 
ArrayList<>());

Review comment:
       @xiaolong-sn consider overloading the method for backwards compatibility

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -66,27 +74,35 @@
         **/
        protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
 
-       /** Default values for RateLimit. **/
+       /**
+        * Default values for RateLimit.
+        **/
        protected static final long DEFAULT_RATE_LIMIT = 100L;
 
-       /** Default value for ThreadingModel. **/
+       /**
+        * Default value for ThreadingModel.
+        **/
        protected static final KinesisProducerConfiguration.ThreadingModel 
DEFAULT_THREADING_MODEL = KinesisProducerConfiguration.ThreadingModel.POOLED;
 
-       /** Default values for ThreadPoolSize. **/
+       /**
+        * Default values for ThreadPoolSize.
+        **/
        protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
 
        /**
         * Validate configuration properties for {@link FlinkKinesisConsumer}.
         */
-       public static void validateConsumerConfiguration(Properties config) {
+       public static void validateConsumerConfiguration(Properties config, 
List<String> streams) {
                checkNotNull(config, "config can not be null");
 
                validateAwsConfiguration(config);
 
+               validateEFOConfiguration(config, streams);

Review comment:
       @xiaolong-sn nit: Camelcase applies to acronyms too, typically 
`validateEFOConfiguration` should be `validateEfoConfiguration`. This applies 
to all instances of fields/methods in the PR

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;
+
+       private int subscribeToShardMaxRetries;
+
+       private long subscribeToShardMaxBackoffMillis;
+
+       private long subscribeToShardBaseBackoffMillis;
+
+       private double subscribeToShardExpConstant;
+
+       public FanOutProperties(Properties properties, List<String> streams) {
+               //validate the properties
+               
Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()),
 "Only efo record publisher can register a FanOutProperties.");
+               KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+               efoRegistrationType = 
EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE,
 EFORegistrationType.EAGER.toString()));
+               //if efo registration type is EAGER|LAZY, then user should 
explicitly provide a consumer name for each stream.
+               if (efoRegistrationType == EFORegistrationType.EAGER || 
efoRegistrationType == EFORegistrationType.LAZY) {
+                       consumerName = 
properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+               } else {
+                       //else users should explicitly provide consumer arns.
+                       streamConsumerArns = new ArrayList<>();
+                       for (String stream:streams) {
+                               String key = 
ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+                               
streamConsumerArns.add(properties.getProperty(key));

Review comment:
       @xiaolong-sn using a map `streamConsumerArns.put(stream, 
properties.getProperty(key));`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void 
validateConsumerConfiguration(Properties config) {
                }
        }
 
+       public static void validateEFOConfiguration(Properties config, 
List<String> streams) {
+               if 
(config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {

Review comment:
       @xiaolong-sn Consider ["Avoid deep nesting of scopes, by flipping the if 
condition and exiting 
early."](https://flink.apache.org/contributing/code-style-and-quality-common.html)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;
+
+       private int subscribeToShardMaxRetries;
+
+       private long subscribeToShardMaxBackoffMillis;
+
+       private long subscribeToShardBaseBackoffMillis;
+
+       private double subscribeToShardExpConstant;
+
+       public FanOutProperties(Properties properties, List<String> streams) {
+               //validate the properties

Review comment:
       @xiaolong-sn some of your comments might violate the "golden rule", 
["Golden rule: Comment as much as necessary to support code understanding, but 
don’t add redundant 
information."](https://flink.apache.org/contributing/code-style-and-quality-common.html#comments-and-code-readability)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+       private static final long serialVersionUID = 3204635913413261619L;
+
+       private EFORegistrationType efoRegistrationType;
+       @Nullable
+       private String consumerName;
+       @Nullable
+       private List<String> streamConsumerArns;
+
+       private int subscribeToShardMaxRetries;
+
+       private long subscribeToShardMaxBackoffMillis;
+
+       private long subscribeToShardBaseBackoffMillis;
+
+       private double subscribeToShardExpConstant;
+
+       public FanOutProperties(Properties properties, List<String> streams) {
+               //validate the properties
+               
Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()),
 "Only efo record publisher can register a FanOutProperties.");
+               KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+               efoRegistrationType = 
EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE,
 EFORegistrationType.EAGER.toString()));
+               //if efo registration type is EAGER|LAZY, then user should 
explicitly provide a consumer name for each stream.
+               if (efoRegistrationType == EFORegistrationType.EAGER || 
efoRegistrationType == EFORegistrationType.LAZY) {
+                       consumerName = 
properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+               } else {
+                       //else users should explicitly provide consumer arns.
+                       streamConsumerArns = new ArrayList<>();
+                       for (String stream:streams) {
+                               String key = 
ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+                               
streamConsumerArns.add(properties.getProperty(key));
+                       }
+               }
+
+               this.subscribeToShardMaxRetries = Integer.parseInt(
+                       properties.getProperty(
+                               
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+               this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+                       
properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+               this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+                       
properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+               this.subscribeToShardExpConstant = Double.parseDouble(
+                       
properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+                               
Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+       }
+
+       public void setEfoRegistrationType(EFORegistrationType 
efoRegistrationType) {
+               this.efoRegistrationType = efoRegistrationType;
+       }
+
+       public void setConsumerName(@Nullable String consumerName) {
+               this.consumerName = consumerName;
+       }
+
+       public void setStreamConsumerArns(@Nullable List<String> 
streamConsumerArns) {
+               this.streamConsumerArns = streamConsumerArns;
+       }
+
+       public void setSubscribeToShardMaxRetries(int 
subscribeToShardMaxRetries) {
+               this.subscribeToShardMaxRetries = subscribeToShardMaxRetries;
+       }
+
+       public void setSubscribeToShardMaxBackoffMillis(long 
subscribeToShardMaxBackoffMillis) {
+               this.subscribeToShardMaxBackoffMillis = 
subscribeToShardMaxBackoffMillis;
+       }
+
+       public void setSubscribeToShardBaseBackoffMillis(long 
subscribeToShardBaseBackoffMillis) {
+               this.subscribeToShardBaseBackoffMillis = 
subscribeToShardBaseBackoffMillis;
+       }
+
+       public void setSubscribeToShardExpConstant(double 
subscribeToShardExpConstant) {
+               this.subscribeToShardExpConstant = subscribeToShardExpConstant;
+       }
+
+       public EFORegistrationType getEfoRegistrationType() {
+               return efoRegistrationType;
+       }
+
+       @Nullable
+       public String getConsumerName() {
+               return consumerName;
+       }
+
+       @Nullable
+       public List<String> getStreamConsumerArns() {

Review comment:
       @xiaolong-sn when we have a map we can add a method `public String 
getStreamConsumerArn(String stream)`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void 
validateConsumerConfiguration(Properties config) {
                }
        }
 
+       public static void validateEFOConfiguration(Properties config, 
List<String> streams) {
+               if 
(config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+                       String recordPublisherType = 
config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+                       // specified record publisher type in stream must be 
either EFO or POLLING
+                       try {
+                               
RecordPublisherType.valueOf(recordPublisherType);
+                       } catch (IllegalArgumentException e) {
+                               StringBuilder sb = new StringBuilder();
+                               for (RecordPublisherType rp : 
RecordPublisherType.values()) {
+                                       sb.append(rp.toString()).append(", ");
+                               }
+                               throw new IllegalArgumentException("Invalid 
record publisher type in stream set in config. Valid values are: " + 
sb.toString());
+                       }
+                       if (RecordPublisherType.valueOf(recordPublisherType) == 
RecordPublisherType.EFO) {
+                               String efoRegistrationType;
+                               if 
(config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+                                       efoRegistrationType = 
config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+                                       // specified efo registration type in 
stream must be either LAZY, EAGER or NONE.
+                                       try {
+                                               
EFORegistrationType.valueOf(efoRegistrationType);
+                                       } catch (IllegalArgumentException e) {
+                                               StringBuilder sb = new 
StringBuilder();

Review comment:
       @xiaolong-sn nit: nit be nicer to build with stream
   
   ```
   String errorMessage = Arrays.stream(EFORegistrationType.values())
       .map(Enum::name).collect(Collectors.joining(", "));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to