lvyanquan commented on code in PR #3791:
URL: https://github.com/apache/flink-cdc/pull/3791#discussion_r1976801568
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java:
##########
@@ -100,4 +100,11 @@ public class KafkaDataSinkOptions {
.text(
". For example, we can set
'sink.tableId-to-topic.mappingg' like
'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
.build());
+
+ public static final ConfigOption<Boolean>
SINK_DEBEZIUM_JSON_SCHEMA_ENABLED =
+ key("sink.debezium-json-schema.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional. If this parameter is configured, each
debezium record will contain debezium schema information.Only
value.format=debezium-json is supported.");
Review Comment:
nil:
information. Only
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java:
##########
@@ -72,9 +74,15 @@ public DataSink createDataSink(Context context) {
KeySerializationFactory.createSerializationSchema(configuration, keyFormat,
zoneId);
JsonSerializationType jsonSerializationType =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
+ Boolean isIncludedDebeziumSchema =
+
context.getFactoryConfiguration().get(SINK_DEBEZIUM_JSON_SCHEMA_ENABLED);
+ Preconditions.checkArgument(
+ !(isIncludedDebeziumSchema
+ &&
!jsonSerializationType.equals(JsonSerializationType.DEBEZIUM_JSON)),
+ "sink.debezium_json_schema.enabled only supported
debezium-json.");
Review Comment:
is only supported when using debezium-json.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java:
##########
@@ -49,7 +49,10 @@ public class ChangeLogJsonFormatFactory {
* @return The configured instance of {@link SerializationSchema}.
*/
public static SerializationSchema<Event> createSerializationSchema(
- ReadableConfig formatOptions, JsonSerializationType type, ZoneId
zoneId) {
+ ReadableConfig formatOptions,
+ JsonSerializationType type,
+ ZoneId zoneId,
+ boolean isIncludedDebeziumSchema) {
Review Comment:
I intend not to modify the parameter of this method as we can extract the
`isIncludedDebeziumSchema` from formatOptions.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java:
##########
@@ -100,4 +100,11 @@ public class KafkaDataSinkOptions {
.text(
". For example, we can set
'sink.tableId-to-topic.mappingg' like
'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
.build());
+
+ public static final ConfigOption<Boolean>
SINK_DEBEZIUM_JSON_SCHEMA_ENABLED =
+ key("sink.debezium-json-schema.enabled")
Review Comment:
sink.debezium-json.include-schema.enabled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]