This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 551dfe4 [FLINK-24977][connectors/kafka] Replace ConfigOption lookup
in Map<String, String> object with proper lookup in Configuration object
551dfe4 is described below
commit 551dfe4842bbf559d22953f32c567a8a35f131c9
Author: Alexander Preuß <[email protected]>
AuthorDate: Tue Nov 30 15:04:43 2021 +0100
[FLINK-24977][connectors/kafka] Replace ConfigOption lookup in Map<String,
String> object with proper lookup in Configuration object
---
.../kafka/table/KafkaDynamicTableFactory.java | 4 +-
.../kafka/table/KafkaDynamicTableFactoryTest.java | 44 +++++++++++-----------
2 files changed, 26 insertions(+), 22 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index b2d5880..1bb4ca8 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -321,7 +321,9 @@ public class KafkaDynamicTableFactory
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration configuration = Configuration.fromMap(options);
String formatName =
-
configuration.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+ configuration
+ .getOptional(FactoryUtil.FORMAT)
+ .orElse(configuration.get(VALUE_FORMAT));
throw new ValidationException(
String.format(
"The Kafka table '%s' with '%s' format doesn't
support defining PRIMARY KEY constraint"
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index a0cc3cf..d44fb1f 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -95,12 +95,12 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/** Abstract test base for {@link KafkaDynamicTableFactory}. */
public class KafkaDynamicTableFactoryTest extends TestLogger {
@@ -840,16 +840,21 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
// pk can be defined on cdc table, should pass
createTableSink(pkSchema, sinkOptions);
- try {
- createTableSink(pkSchema, getBasicSinkOptions());
- fail();
- } catch (Throwable t) {
- String error =
- "The Kafka table 'default.default.t1' with 'test-format'
format"
- + " doesn't support defining PRIMARY KEY
constraint on the table, because it can't"
- + " guarantee the semantic of primary key.";
- assertEquals(error, t.getCause().getMessage());
- }
+ assertThatExceptionOfType(ValidationException.class)
+ .isThrownBy(() -> createTableSink(pkSchema,
getBasicSinkOptions()))
+ .havingRootCause()
+ .withMessage(
+ "The Kafka table 'default.default.t1' with
'test-format' format"
+ + " doesn't support defining PRIMARY KEY
constraint on the table, because it can't"
+ + " guarantee the semantic of primary key.");
+
+ assertThatExceptionOfType(ValidationException.class)
+ .isThrownBy(() -> createTableSink(pkSchema,
getKeyValueOptions()))
+ .havingRootCause()
+ .withMessage(
+ "The Kafka table 'default.default.t1' with
'test-format' format"
+ + " doesn't support defining PRIMARY KEY
constraint on the table, because it can't"
+ + " guarantee the semantic of primary key.");
Map<String, String> sourceOptions =
getModifiedOptions(
@@ -864,16 +869,13 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
// pk can be defined on cdc table, should pass
createTableSource(pkSchema, sourceOptions);
- try {
- createTableSource(pkSchema, getBasicSourceOptions());
- fail();
- } catch (Throwable t) {
- String error =
- "The Kafka table 'default.default.t1' with 'test-format'
format"
- + " doesn't support defining PRIMARY KEY
constraint on the table, because it can't"
- + " guarantee the semantic of primary key.";
- assertEquals(error, t.getCause().getMessage());
- }
+ assertThatExceptionOfType(ValidationException.class)
+ .isThrownBy(() -> createTableSource(pkSchema,
getBasicSourceOptions()))
+ .havingRootCause()
+ .withMessage(
+ "The Kafka table 'default.default.t1' with
'test-format' format"
+ + " doesn't support defining PRIMARY KEY
constraint on the table, because it can't"
+ + " guarantee the semantic of primary key.");
}
//
--------------------------------------------------------------------------------------------