This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 551dfe4  [FLINK-24977][connectors/kafka] Replace ConfigOption lookup 
in Map<String, String> object with proper lookup in Configuration object
551dfe4 is described below

commit 551dfe4842bbf559d22953f32c567a8a35f131c9
Author: Alexander Preuß <[email protected]>
AuthorDate: Tue Nov 30 15:04:43 2021 +0100

    [FLINK-24977][connectors/kafka] Replace ConfigOption lookup in Map<String, 
String> object with proper lookup in Configuration object
---
 .../kafka/table/KafkaDynamicTableFactory.java      |  4 +-
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 44 +++++++++++-----------
 2 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index b2d5880..1bb4ca8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -321,7 +321,9 @@ public class KafkaDynamicTableFactory
                 && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
             Configuration configuration = Configuration.fromMap(options);
             String formatName =
-                    
configuration.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+                    configuration
+                            .getOptional(FactoryUtil.FORMAT)
+                            .orElse(configuration.get(VALUE_FORMAT));
             throw new ValidationException(
                     String.format(
                             "The Kafka table '%s' with '%s' format doesn't 
support defining PRIMARY KEY constraint"
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index a0cc3cf..d44fb1f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -95,12 +95,12 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /** Abstract test base for {@link KafkaDynamicTableFactory}. */
 public class KafkaDynamicTableFactoryTest extends TestLogger {
@@ -840,16 +840,21 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
         // pk can be defined on cdc table, should pass
         createTableSink(pkSchema, sinkOptions);
 
-        try {
-            createTableSink(pkSchema, getBasicSinkOptions());
-            fail();
-        } catch (Throwable t) {
-            String error =
-                    "The Kafka table 'default.default.t1' with 'test-format' 
format"
-                            + " doesn't support defining PRIMARY KEY 
constraint on the table, because it can't"
-                            + " guarantee the semantic of primary key.";
-            assertEquals(error, t.getCause().getMessage());
-        }
+        assertThatExceptionOfType(ValidationException.class)
+                .isThrownBy(() -> createTableSink(pkSchema, 
getBasicSinkOptions()))
+                .havingRootCause()
+                .withMessage(
+                        "The Kafka table 'default.default.t1' with 
'test-format' format"
+                                + " doesn't support defining PRIMARY KEY 
constraint on the table, because it can't"
+                                + " guarantee the semantic of primary key.");
+
+        assertThatExceptionOfType(ValidationException.class)
+                .isThrownBy(() -> createTableSink(pkSchema, 
getKeyValueOptions()))
+                .havingRootCause()
+                .withMessage(
+                        "The Kafka table 'default.default.t1' with 
'test-format' format"
+                                + " doesn't support defining PRIMARY KEY 
constraint on the table, because it can't"
+                                + " guarantee the semantic of primary key.");
 
         Map<String, String> sourceOptions =
                 getModifiedOptions(
@@ -864,16 +869,13 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
         // pk can be defined on cdc table, should pass
         createTableSource(pkSchema, sourceOptions);
 
-        try {
-            createTableSource(pkSchema, getBasicSourceOptions());
-            fail();
-        } catch (Throwable t) {
-            String error =
-                    "The Kafka table 'default.default.t1' with 'test-format' 
format"
-                            + " doesn't support defining PRIMARY KEY 
constraint on the table, because it can't"
-                            + " guarantee the semantic of primary key.";
-            assertEquals(error, t.getCause().getMessage());
-        }
+        assertThatExceptionOfType(ValidationException.class)
+                .isThrownBy(() -> createTableSource(pkSchema, 
getBasicSourceOptions()))
+                .havingRootCause()
+                .withMessage(
+                        "The Kafka table 'default.default.t1' with 
'test-format' format"
+                                + " doesn't support defining PRIMARY KEY 
constraint on the table, because it can't"
+                                + " guarantee the semantic of primary key.");
     }
 
     // 
--------------------------------------------------------------------------------------------

Reply via email to