This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit b61b1366ae9fe98468fca9533ed75c88ce550dee
Author: MOBIN-F <18814118...@163.com>
AuthorDate: Wed Apr 9 17:05:49 2025 +0800

    [FLINK-36611][pipeline-connector/kafka] Rename 
sink.debezium-json.include-schema.enabled to 
debezium-json.include-schema.enabled
---
 .../docs/connectors/pipeline-connectors/kafka.md          |  4 ++--
 docs/content/docs/connectors/pipeline-connectors/kafka.md |  4 ++--
 .../connectors/kafka/json/ChangeLogJsonFormatFactory.java | 15 ++++++++-------
 .../cdc/connectors/kafka/sink/KafkaDataSinkFactory.java   |  4 ++--
 .../cdc/connectors/kafka/sink/KafkaDataSinkOptions.java   |  4 ++--
 .../debezium/DebeziumJsonSerializationSchemaTest.java     |  8 +++-----
 .../flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java   |  2 +-
 7 files changed, 20 insertions(+), 21 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index 419499b75..bc655a5b4 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -151,7 +151,7 @@ Pipeline 连接器配置项
       <td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 
Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 
`mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
     </tr>
     <tr>
-      <td>sink.debezium-json.include-schema.enabled</td>
+      <td>debezium-json.include-schema.enabled</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
@@ -187,7 +187,7 @@ Pipeline 连接器配置项
   }
 }
 ```
-当`sink.debezium-json.include-schema.enabled=true`时,输出示例如下:
+当`debezium-json.include-schema.enabled=true`时,输出示例如下:
 ```json
 {
   "schema":{
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index b9cf96e40..b94efc149 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -149,7 +149,7 @@ Pipeline Connector Options
       <td>Custom table mappings for each table from upstream tableId to 
downstream Kafka topic. Each mapping is separated by `;`, separate upstream 
tableId and downstream Kafka topic by `:`, For example, we can set 
`sink.tableId-to-topic.mapping` like 
`mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
     </tr>
     <tr>
-      <td>sink.debezium-json.include-schema.enabled</td>
+      <td>debezium-json.include-schema.enabled</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
@@ -185,7 +185,7 @@ An output example is:
   }
 }
 ```
-When `sink.debezium-json.include-schema.enabled` is true, the output format 
will be:
+When `debezium-json.include-schema.enabled` is true, the output format will be:
 ```json
 {
   "schema":{
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
index 3b55a05ba..a6dcdcd1e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.cdc.connectors.kafka.json;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.utils.Preconditions;
 import 
org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
 import 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
 import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
@@ -31,7 +30,7 @@ import org.apache.flink.formats.json.JsonFormatOptionsUtil;
 
 import java.time.ZoneId;
 
-import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
 import static 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
 
@@ -52,13 +51,15 @@ public class ChangeLogJsonFormatFactory {
      */
     public static SerializationSchema<Event> createSerializationSchema(
             ReadableConfig formatOptions, JsonSerializationType type, ZoneId 
zoneId) {
+        final String prefix = type.toString() + ".";
         boolean isIncludedDebeziumSchema =
                 Boolean.parseBoolean(
-                        
formatOptions.toMap().get(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key()));
-        Preconditions.checkArgument(
-                !(isIncludedDebeziumSchema && 
!type.equals(JsonSerializationType.DEBEZIUM_JSON)),
-                SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key()
-                        + " is only supported when using debezium-json.");
+                        formatOptions
+                                .toMap()
+                                .get(
+                                        DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED
+                                                .key()
+                                                .substring(prefix.length())));
 
         TimestampFormat timestampFormat = 
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
         JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
index 05090b30b..9d3f605cb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
@@ -35,12 +35,12 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.KEY_FORMAT;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PARTITION_STRATEGY;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
-import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
@@ -132,7 +132,7 @@ public class KafkaDataSinkFactory implements 
DataSinkFactory {
         options.add(SINK_CUSTOM_HEADER);
         options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
         options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
-        options.add(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED);
+        options.add(DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
index a8eeed8cf..d6176a073 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
@@ -101,8 +101,8 @@ public class KafkaDataSinkOptions {
                                             ". For example, we can set 
'sink.tableId-to-topic.mappingg' like 
'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
                                     .build());
 
-    public static final ConfigOption<Boolean> 
SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED =
-            key("sink.debezium-json.include-schema.enabled")
+    public static final ConfigOption<Boolean> 
DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED =
+            key("debezium-json.include-schema.enabled")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 4c9933bf2..6108f24d8 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -49,8 +49,6 @@ import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
-
 /** Tests for {@link DebeziumJsonSerializationSchema}. */
 class DebeziumJsonSerializationSchemaTest {
 
@@ -145,7 +143,7 @@ class DebeziumJsonSerializationSchemaTest {
                 JacksonMapperFactory.createObjectMapper()
                         
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
         Map<String, String> properties = new HashMap<>();
-        properties.put(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(), 
"true");
+        properties.put("include-schema.enabled", "true");
         Configuration configuration = Configuration.fromMap(properties);
         SerializationSchema<Event> serializationSchema =
                 ChangeLogJsonFormatFactory.createSerializationSchema(
@@ -205,7 +203,7 @@ class DebeziumJsonSerializationSchemaTest {
                         DataTypes.STRING());
 
         CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
-        Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+        
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
         BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
         // insert
         DataChangeEvent insertEvent1 =
@@ -244,6 +242,6 @@ class DebeziumJsonSerializationSchemaTest {
                 mapper.readTree(
                         
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"doc\":\"_boolean
 
comment\",\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"10\"},\"field\":\"_varbinary\"},{\"t
 [...]
         JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
-        Assertions.assertEquals(expected, actual);
+        Assertions.assertThat(actual).isEqualTo(expected);
     }
 }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
index e85924558..b612f682d 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
@@ -319,7 +319,7 @@ class MysqlToKafkaE2eITCase extends PipelineTestEnvironment 
{
                                 + "  type: kafka\n"
                                 + "  properties.bootstrap.servers: 
kafka:9092\n"
                                 + "  topic: %s\n"
-                                + "  
sink.debezium-json.include-schema.enabled: true\n"
+                                + "  debezium-json.include-schema.enabled: 
true\n"
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  parallelism: %d",

Reply via email to