This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit b61b1366ae9fe98468fca9533ed75c88ce550dee Author: MOBIN-F <18814118...@163.com> AuthorDate: Wed Apr 9 17:05:49 2025 +0800 [FLINK-36611][pipeline-connector/kafka] Rename sink.debezium-json.include-schema.enabled to debezium-json.include-schema.enabled --- .../docs/connectors/pipeline-connectors/kafka.md | 4 ++-- docs/content/docs/connectors/pipeline-connectors/kafka.md | 4 ++-- .../connectors/kafka/json/ChangeLogJsonFormatFactory.java | 15 ++++++++------- .../cdc/connectors/kafka/sink/KafkaDataSinkFactory.java | 4 ++-- .../cdc/connectors/kafka/sink/KafkaDataSinkOptions.java | 4 ++-- .../debezium/DebeziumJsonSerializationSchemaTest.java | 8 +++----- .../flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java | 2 +- 7 files changed, 20 insertions(+), 21 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md index 419499b75..bc655a5b4 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md @@ -151,7 +151,7 @@ Pipeline 连接器配置项 <td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td> </tr> <tr> - <td>sink.debezium-json.include-schema.enabled</td> + <td>debezium-json.include-schema.enabled</td> <td>optional</td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> @@ -187,7 +187,7 @@ Pipeline 连接器配置项 } } ``` -当`sink.debezium-json.include-schema.enabled=true`时,输出示例如下: +当`debezium-json.include-schema.enabled=true`时,输出示例如下: ```json { "schema":{ diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md b/docs/content/docs/connectors/pipeline-connectors/kafka.md index b9cf96e40..b94efc149 100644 --- a/docs/content/docs/connectors/pipeline-connectors/kafka.md +++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md @@ -149,7 +149,7 @@ Pipeline Connector Options <td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td> </tr> <tr> - <td>sink.debezium-json.include-schema.enabled</td> + <td>debezium-json.include-schema.enabled</td> <td>optional</td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> @@ -185,7 +185,7 @@ An output example is: } } ``` -When `sink.debezium-json.include-schema.enabled` is true, the output format will be: +When `debezium-json.include-schema.enabled` is true, the output format will be: ```json { "schema":{ diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java index 3b55a05ba..a6dcdcd1e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java @@ -20,7 +20,6 @@ package org.apache.flink.cdc.connectors.kafka.json; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema; import org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema; import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils; @@ -31,7 +30,7 @@ import org.apache.flink.formats.json.JsonFormatOptionsUtil; import java.time.ZoneId; -import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; @@ -52,13 +51,15 @@ public class ChangeLogJsonFormatFactory { */ public static SerializationSchema<Event> createSerializationSchema( ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) { + final String prefix = type.toString() + "."; boolean isIncludedDebeziumSchema = Boolean.parseBoolean( - formatOptions.toMap().get(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key())); - Preconditions.checkArgument( - !(isIncludedDebeziumSchema && !type.equals(JsonSerializationType.DEBEZIUM_JSON)), - SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key() - + " is only supported when using debezium-json."); + formatOptions + .toMap() + .get( + DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED + .key() + .substring(prefix.length()))); TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); JsonFormatOptions.MapNullKeyMode mapNullKeyMode = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java index 05090b30b..9d3f605cb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java @@ -35,12 +35,12 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.KEY_FORMAT; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PARTITION_STRATEGY; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER; -import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT; @@ -132,7 +132,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory { options.add(SINK_CUSTOM_HEADER); options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE); options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING); - options.add(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED); + options.add(DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java index a8eeed8cf..d6176a073 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java @@ -101,8 +101,8 @@ public class KafkaDataSinkOptions { ". For example, we can set 'sink.tableId-to-topic.mappingg' like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.") .build()); - public static final ConfigOption<Boolean> SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED = - key("sink.debezium-json.include-schema.enabled") + public static final ConfigOption<Boolean> DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED = + key("debezium-json.include-schema.enabled") .booleanType() .defaultValue(false) .withDescription( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index 4c9933bf2..6108f24d8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -49,8 +49,6 @@ import java.time.ZoneId; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED; - /** Tests for {@link DebeziumJsonSerializationSchema}. */ class DebeziumJsonSerializationSchemaTest { @@ -145,7 +143,7 @@ class DebeziumJsonSerializationSchemaTest { JacksonMapperFactory.createObjectMapper() .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); Map<String, String> properties = new HashMap<>(); - properties.put(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(), "true"); + properties.put("include-schema.enabled", "true"); Configuration configuration = Configuration.fromMap(properties); SerializationSchema<Event> serializationSchema = ChangeLogJsonFormatFactory.createSerializationSchema( @@ -205,7 +203,7 @@ class DebeziumJsonSerializationSchemaTest { DataTypes.STRING()); CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema); - Assertions.assertNull(serializationSchema.serialize(createTableEvent)); + Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull(); BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); // insert DataChangeEvent insertEvent1 = @@ -244,6 +242,6 @@ class DebeziumJsonSerializationSchemaTest { mapper.readTree( "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"doc\":\"_boolean comment\",\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"10\"},\"field\":\"_varbinary\"},{\"t [...] JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1)); - Assertions.assertEquals(expected, actual); + Assertions.assertThat(actual).isEqualTo(expected); } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java index e85924558..b612f682d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java @@ -319,7 +319,7 @@ class MysqlToKafkaE2eITCase extends PipelineTestEnvironment { + " type: kafka\n" + " properties.bootstrap.servers: kafka:9092\n" + " topic: %s\n" - + " sink.debezium-json.include-schema.enabled: true\n" + + " debezium-json.include-schema.enabled: true\n" + "\n" + "pipeline:\n" + " parallelism: %d",