This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 76d791f  [FLINK-33181][Connectors/Kinesis] Not validate source & sink 
options when creating dynamic tables (#105)
76d791f is described below

commit 76d791fe72c328fc80f3efe26f57a33e8b67dc23
Author: Khanh Vu <[email protected]>
AuthorDate: Thu Oct 19 12:16:50 2023 +0100

    [FLINK-33181][Connectors/Kinesis] Not validate source & sink options when 
creating dynamic tables (#105)
    
    Co-authored-by: Khanh <[email protected]>
---
 .../util/KinesisStreamsConnectorOptionsUtils.java  |  9 +++-
 .../table/KinesisDynamicTableSinkFactoryTest.java  | 48 +++++++++++++++++++
 .../kinesis/table/KinesisConsumerOptionsUtil.java  |  6 ++-
 .../table/KinesisDynamicTableFactoryTest.java      | 55 ++++++++++++++++++++++
 4 files changed, 115 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
index 4b30fe0..3d74361 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
@@ -62,6 +62,8 @@ public class KinesisStreamsConnectorOptionsUtils {
     /** Key for accessing kinesisAsyncClient properties. */
     public static final String KINESIS_CLIENT_PROPERTIES_KEY = 
"sink.client.properties";
 
+    public static final String CONSUMER_PREFIX = "scan.";
+
     private final AsyncClientOptionsUtils asyncClientOptionsUtils;
     private final AsyncSinkConfigurationValidator 
asyncSinkconfigurationValidator;
     private final Map<String, String> resolvedOptions;
@@ -75,8 +77,8 @@ public class KinesisStreamsConnectorOptionsUtils {
     private static final String[] NON_VALIDATED_PREFIXES =
             new String[] {
                 AWSOptionUtils.AWS_PROPERTIES_PREFIX,
-                AsyncClientOptionsUtils.SINK_CLIENT_PREFIX,
-                KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX
+                KinesisProducerOptionsMapper.PRODUCER_PREFIX,
+                CONSUMER_PREFIX
             };
 
     public KinesisStreamsConnectorOptionsUtils(
@@ -121,6 +123,9 @@ public class KinesisStreamsConnectorOptionsUtils {
     public static class KinesisProducerOptionsMapper {
         private static final Logger LOG =
                 LoggerFactory.getLogger(KinesisProducerOptionsMapper.class);
+
+        public static final String PRODUCER_PREFIX = "sink.";
+
         /** prefix for deprecated producer options fallback keys. */
         public static final String KINESIS_PRODUCER_PREFIX = "sink.producer.";
 
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
index ef939cd..edc2e6e 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
@@ -48,6 +48,7 @@ import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MA
 import static 
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
 import static 
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_FAIL_ON_ERROR;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link KinesisDynamicSink} created by {@link 
KinesisDynamicTableSinkFactory}. */
 class KinesisDynamicTableSinkFactoryTest {
@@ -173,6 +174,46 @@ class KinesisDynamicTableSinkFactoryTest {
         
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
     }
 
+    @Test
+    void testGoodTableSinkForNonPartitionedTableWithSinkAndConsumerOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> tableOptions = 
defaultTableOptionsWithSinkAndConsumerOptions().build();
+
+        // Construct actual DynamicTableSink using FactoryUtil
+        KinesisDynamicSink actualSink =
+                (KinesisDynamicSink) createTableSink(sinkSchema, tableOptions);
+
+        // Construct expected DynamicTableSink using factory under test
+        KinesisDynamicSink expectedSink =
+                getDefaultSinkBuilder()
+                        
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+                        .setStream(STREAM_NAME)
+                        
.setKinesisClientProperties(defaultProducerProperties())
+                        .setEncodingFormat(new 
TestFormatFactory.EncodingFormatMock(","))
+                        .setPartitioner(new 
RandomKinesisPartitionKeyGenerator<>())
+                        .build();
+
+        Assertions.assertThat(actualSink).isEqualTo(expectedSink);
+
+        // verify the produced sink
+        DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+                actualSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
+        Sink<RowData> sinkFunction = ((SinkV2Provider) 
sinkFunctionProvider).createSink();
+        
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
+    }
+
+    @Test
+    void testBadTableSinkWithUnsupportedOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> tableOptions =
+                defaultTableOptions().withTableOption("invalid.option", 
"some_value").build();
+
+        assertThatThrownBy(() -> createTableSink(sinkSchema, tableOptions))
+                .hasCauseInstanceOf(ValidationException.class)
+                .hasStackTraceContaining("Unsupported options:")
+                .hasStackTraceContaining("invalid.option");
+    }
+
     @Test
     void testGoodTableSinkForNonPartitionedTableWithProducerOptions() {
         ResolvedSchema sinkSchema = defaultSinkSchema();
@@ -258,6 +299,13 @@ class KinesisDynamicTableSinkFactoryTest {
                 .withTableOption(FLUSH_BUFFER_TIMEOUT.key(), "1000");
     }
 
+    private TableOptionsBuilder 
defaultTableOptionsWithSinkAndConsumerOptions() {
+        return defaultTableOptionsWithSinkOptions()
+                .withTableOption("scan.stream.initpos", "AT_TIMESTAMP")
+                .withTableOption("scan.stream.initpos-timestamp-format", 
"yyyy-MM-dd'T'HH:mm:ss")
+                .withTableOption("scan.stream.initpos-timestamp", 
"2022-10-22T12:00:00");
+    }
+
     private TableOptionsBuilder defaultTableOptionsWithDeprecatedOptions() {
         return defaultTableOptions()
                 .withTableOption("sink.producer.record-max-buffered-time", 
"1000")
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
index c3c2307..6bdb568 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.table;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.aws.table.util.AWSOptionUtils;
+import 
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 
 import java.util.Arrays;
@@ -60,7 +61,10 @@ public class KinesisConsumerOptionsUtil extends 
AWSOptionUtils {
 
     @Override
     public List<String> getNonValidatedPrefixes() {
-        return Arrays.asList(AWS_PROPERTIES_PREFIX, CONSUMER_PREFIX);
+        return Arrays.asList(
+                AWS_PROPERTIES_PREFIX,
+                CONSUMER_PREFIX,
+                
KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.PRODUCER_PREFIX);
     }
 
     @Override
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
index bf1a6dc..0309af9 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.util.UniformShardAssigner;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.WatermarkSpec;
@@ -59,6 +60,7 @@ import static 
org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesis
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Test for {@link KinesisDynamicSource} and {@link KinesisDynamicSink} 
created by {@link
@@ -245,6 +247,49 @@ public class KinesisDynamicTableFactoryTest extends 
TestLogger {
         
assertThat(kinesisConsumer.getShardAssigner().getClass().equals(defaultShardAssignerClass));
     }
 
+    @Test
+    public void testGoodTableSourceWithSinkOptions() {
+        ResolvedSchema sourceSchema = defaultSourceSchema();
+        Map<String, String> tableOptions = 
defaultTableOptionsWithSinkOptions().build();
+
+        // Construct actual DynamicTableSource using FactoryUtil
+        KinesisDynamicSource actualSource =
+                (KinesisDynamicSource) createTableSource(sourceSchema, 
tableOptions);
+
+        // Construct expected DynamicTableSink using factory under test
+        KinesisDynamicSource expectedSource =
+                new KinesisDynamicSource(
+                        sourceSchema.toPhysicalRowDataType(),
+                        STREAM_NAME,
+                        DEFAULT_SHARD_ASSIGNER_ID,
+                        defaultConsumerProperties(),
+                        new TestFormatFactory.DecodingFormatMock(",", true));
+
+        // verify that the constructed DynamicTableSink is as expected
+        assertThat(actualSource).isEqualTo(expectedSource);
+
+        // verify produced source
+        ScanTableSource.ScanRuntimeProvider functionProvider =
+                
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        SourceFunction<RowData> sourceFunction =
+                as(functionProvider, 
SourceFunctionProvider.class).createSourceFunction();
+        assertThat(sourceFunction).isInstanceOf(FlinkKinesisConsumer.class);
+    }
+
+    @Test
+    public void testBadTableSourceWithUnsupportedOptions() {
+        ResolvedSchema sourceSchema = defaultSourceSchema();
+        Map<String, String> tableOptions =
+                defaultTableOptionsWithSinkOptions()
+                        .withTableOption("invalid.option", "some_value")
+                        .build();
+
+        assertThatThrownBy(() -> createTableSource(sourceSchema, tableOptions))
+                .hasCauseInstanceOf(ValidationException.class)
+                .hasStackTraceContaining("Unsupported options:")
+                .hasStackTraceContaining("invalid.option");
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Utilities
     // 
--------------------------------------------------------------------------------------------
@@ -292,6 +337,16 @@ public class KinesisDynamicTableFactoryTest extends 
TestLogger {
                 .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
     }
 
+    private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
+        return defaultTableOptions()
+                .withTableOption("sink.fail-on-error", "true")
+                .withTableOption("sink.batch.max-size", "100")
+                .withTableOption("sink.requests.max-inflight", "100")
+                .withTableOption("sink.requests.max-buffered", "100")
+                .withTableOption("sink.flush-buffer.size", "1000")
+                .withTableOption("sink.flush-buffer.timeout", "1000");
+    }
+
     private TableOptionsBuilder defaultSinkTableOptions() {
         String connector = KinesisDynamicTableFactory.IDENTIFIER;
         String format = TestFormatFactory.IDENTIFIER;

Reply via email to