xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457246071
##########
File path:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FanOutProperties}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FanOutProperties.class)
+public class FanOutPropertiesTest extends TestLogger {
+ @Rule
+ private ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testPollingRecordPublisher() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Only efo record publisher can register
a FanOutProperties.");
+
+ Properties testConfig = TestUtils.getStandardProperties();
+
testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
RecordPublisherType.POLLING.toString());
+
+ new FanOutProperties(testConfig, new ArrayList<>());
+ }
+
+ @Test
+ public void testEagerStrategyWithConsumerName() {
+ String fakedConsumerName = "fakedconsumername";
+ Properties testConfig = TestUtils.getStandardProperties();
+
testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
RecordPublisherType.EFO.toString());
+
testConfig.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME,
fakedConsumerName);
+ FanOutProperties fanOutProperties = new
FanOutProperties(testConfig, new ArrayList<>());
+ assertEquals(fanOutProperties.getConsumerName(),
fakedConsumerName);
+ }
+
+ @Test
+ public void testEagerStrategyWithNoConsumerName() {
+ String msg = "No valid enhanced fan-out consumer name is set
through " + ConsumerConfigConstants.EFO_CONSUMER_NAME;
+
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage(msg);
+
+ Properties testConfig = TestUtils.getStandardProperties();
+
testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
RecordPublisherType.EFO.toString());
+ new FanOutProperties(testConfig, new ArrayList<>());
+ }
+
+ @Test
+ public void testNoneStrategyWithStreams() {
+ String fakedStream1 = "fakedstream1";
Review comment:
I changed the codes accordingly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]