This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 76d791f [FLINK-33181][Connectors/Kinesis] Not validate source & sink
options when creating dynamic tables (#105)
76d791f is described below
commit 76d791fe72c328fc80f3efe26f57a33e8b67dc23
Author: Khanh Vu <[email protected]>
AuthorDate: Thu Oct 19 12:16:50 2023 +0100
[FLINK-33181][Connectors/Kinesis] Not validate source & sink options when
creating dynamic tables (#105)
Co-authored-by: Khanh <[email protected]>
---
.../util/KinesisStreamsConnectorOptionsUtils.java | 9 +++-
.../table/KinesisDynamicTableSinkFactoryTest.java | 48 +++++++++++++++++++
.../kinesis/table/KinesisConsumerOptionsUtil.java | 6 ++-
.../table/KinesisDynamicTableFactoryTest.java | 55 ++++++++++++++++++++++
4 files changed, 115 insertions(+), 3 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
index 4b30fe0..3d74361 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
@@ -62,6 +62,8 @@ public class KinesisStreamsConnectorOptionsUtils {
/** Key for accessing kinesisAsyncClient properties. */
public static final String KINESIS_CLIENT_PROPERTIES_KEY =
"sink.client.properties";
+ public static final String CONSUMER_PREFIX = "scan.";
+
private final AsyncClientOptionsUtils asyncClientOptionsUtils;
private final AsyncSinkConfigurationValidator
asyncSinkconfigurationValidator;
private final Map<String, String> resolvedOptions;
@@ -75,8 +77,8 @@ public class KinesisStreamsConnectorOptionsUtils {
private static final String[] NON_VALIDATED_PREFIXES =
new String[] {
AWSOptionUtils.AWS_PROPERTIES_PREFIX,
- AsyncClientOptionsUtils.SINK_CLIENT_PREFIX,
- KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX
+ KinesisProducerOptionsMapper.PRODUCER_PREFIX,
+ CONSUMER_PREFIX
};
public KinesisStreamsConnectorOptionsUtils(
@@ -121,6 +123,9 @@ public class KinesisStreamsConnectorOptionsUtils {
public static class KinesisProducerOptionsMapper {
private static final Logger LOG =
LoggerFactory.getLogger(KinesisProducerOptionsMapper.class);
+
+ public static final String PRODUCER_PREFIX = "sink.";
+
/** prefix for deprecated producer options fallback keys. */
public static final String KINESIS_PRODUCER_PREFIX = "sink.producer.";
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
index ef939cd..edc2e6e 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
@@ -48,6 +48,7 @@ import static
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MA
import static
org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS;
import static
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_FAIL_ON_ERROR;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link KinesisDynamicSink} created by {@link
KinesisDynamicTableSinkFactory}. */
class KinesisDynamicTableSinkFactoryTest {
@@ -173,6 +174,46 @@ class KinesisDynamicTableSinkFactoryTest {
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
}
+ @Test
+ void testGoodTableSinkForNonPartitionedTableWithSinkAndConsumerOptions() {
+ ResolvedSchema sinkSchema = defaultSinkSchema();
+ Map<String, String> tableOptions =
defaultTableOptionsWithSinkAndConsumerOptions().build();
+
+ // Construct actual DynamicTableSink using FactoryUtil
+ KinesisDynamicSink actualSink =
+ (KinesisDynamicSink) createTableSink(sinkSchema, tableOptions);
+
+ // Construct expected DynamicTableSink using factory under test
+ KinesisDynamicSink expectedSink =
+ getDefaultSinkBuilder()
+
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+ .setStream(STREAM_NAME)
+
.setKinesisClientProperties(defaultProducerProperties())
+ .setEncodingFormat(new
TestFormatFactory.EncodingFormatMock(","))
+ .setPartitioner(new
RandomKinesisPartitionKeyGenerator<>())
+ .build();
+
+ Assertions.assertThat(actualSink).isEqualTo(expectedSink);
+
+ // verify the produced sink
+ DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+ actualSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
+ Sink<RowData> sinkFunction = ((SinkV2Provider)
sinkFunctionProvider).createSink();
+
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class);
+ }
+
+ @Test
+ void testBadTableSinkWithUnsupportedOptions() {
+ ResolvedSchema sinkSchema = defaultSinkSchema();
+ Map<String, String> tableOptions =
+ defaultTableOptions().withTableOption("invalid.option",
"some_value").build();
+
+ assertThatThrownBy(() -> createTableSink(sinkSchema, tableOptions))
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasStackTraceContaining("Unsupported options:")
+ .hasStackTraceContaining("invalid.option");
+ }
+
@Test
void testGoodTableSinkForNonPartitionedTableWithProducerOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
@@ -258,6 +299,13 @@ class KinesisDynamicTableSinkFactoryTest {
.withTableOption(FLUSH_BUFFER_TIMEOUT.key(), "1000");
}
+ private TableOptionsBuilder
defaultTableOptionsWithSinkAndConsumerOptions() {
+ return defaultTableOptionsWithSinkOptions()
+ .withTableOption("scan.stream.initpos", "AT_TIMESTAMP")
+ .withTableOption("scan.stream.initpos-timestamp-format",
"yyyy-MM-dd'T'HH:mm:ss")
+ .withTableOption("scan.stream.initpos-timestamp",
"2022-10-22T12:00:00");
+ }
+
private TableOptionsBuilder defaultTableOptionsWithDeprecatedOptions() {
return defaultTableOptions()
.withTableOption("sink.producer.record-max-buffered-time",
"1000")
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
index c3c2307..6bdb568 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisConsumerOptionsUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.table;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.aws.table.util.AWSOptionUtils;
+import
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import java.util.Arrays;
@@ -60,7 +61,10 @@ public class KinesisConsumerOptionsUtil extends
AWSOptionUtils {
@Override
public List<String> getNonValidatedPrefixes() {
- return Arrays.asList(AWS_PROPERTIES_PREFIX, CONSUMER_PREFIX);
+ return Arrays.asList(
+ AWS_PROPERTIES_PREFIX,
+ CONSUMER_PREFIX,
+
KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.PRODUCER_PREFIX);
}
@Override
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
index bf1a6dc..0309af9 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.util.UniformShardAssigner;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
@@ -59,6 +60,7 @@ import static
org.apache.flink.streaming.connectors.kinesis.table.RowDataKinesis
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Test for {@link KinesisDynamicSource} and {@link KinesisDynamicSink}
created by {@link
@@ -245,6 +247,49 @@ public class KinesisDynamicTableFactoryTest extends
TestLogger {
assertThat(kinesisConsumer.getShardAssigner().getClass().equals(defaultShardAssignerClass));
}
+ @Test
+ public void testGoodTableSourceWithSinkOptions() {
+ ResolvedSchema sourceSchema = defaultSourceSchema();
+ Map<String, String> tableOptions =
defaultTableOptionsWithSinkOptions().build();
+
+ // Construct actual DynamicTableSource using FactoryUtil
+ KinesisDynamicSource actualSource =
+ (KinesisDynamicSource) createTableSource(sourceSchema,
tableOptions);
+
+ // Construct expected DynamicTableSink using factory under test
+ KinesisDynamicSource expectedSource =
+ new KinesisDynamicSource(
+ sourceSchema.toPhysicalRowDataType(),
+ STREAM_NAME,
+ DEFAULT_SHARD_ASSIGNER_ID,
+ defaultConsumerProperties(),
+ new TestFormatFactory.DecodingFormatMock(",", true));
+
+ // verify that the constructed DynamicTableSink is as expected
+ assertThat(actualSource).isEqualTo(expectedSource);
+
+ // verify produced source
+ ScanTableSource.ScanRuntimeProvider functionProvider =
+
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ SourceFunction<RowData> sourceFunction =
+ as(functionProvider,
SourceFunctionProvider.class).createSourceFunction();
+ assertThat(sourceFunction).isInstanceOf(FlinkKinesisConsumer.class);
+ }
+
+ @Test
+ public void testBadTableSourceWithUnsupportedOptions() {
+ ResolvedSchema sourceSchema = defaultSourceSchema();
+ Map<String, String> tableOptions =
+ defaultTableOptionsWithSinkOptions()
+ .withTableOption("invalid.option", "some_value")
+ .build();
+
+ assertThatThrownBy(() -> createTableSource(sourceSchema, tableOptions))
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasStackTraceContaining("Unsupported options:")
+ .hasStackTraceContaining("invalid.option");
+ }
+
//
--------------------------------------------------------------------------------------------
// Utilities
//
--------------------------------------------------------------------------------------------
@@ -292,6 +337,16 @@ public class KinesisDynamicTableFactoryTest extends
TestLogger {
.withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
}
+ private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
+ return defaultTableOptions()
+ .withTableOption("sink.fail-on-error", "true")
+ .withTableOption("sink.batch.max-size", "100")
+ .withTableOption("sink.requests.max-inflight", "100")
+ .withTableOption("sink.requests.max-buffered", "100")
+ .withTableOption("sink.flush-buffer.size", "1000")
+ .withTableOption("sink.flush-buffer.timeout", "1000");
+ }
+
private TableOptionsBuilder defaultSinkTableOptions() {
String connector = KinesisDynamicTableFactory.IDENTIFIER;
String format = TestFormatFactory.IDENTIFIER;