This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 9d6154f12 [FLINK-35791][kafka] Add database and table info of Canal /
Debezium json format for Kafka sink (#3461)
9d6154f12 is described below
commit 9d6154f12307da24551a6f24b5be72313d01837c
Author: Kunni <[email protected]>
AuthorDate: Thu Aug 8 16:06:43 2024 +0800
[FLINK-35791][kafka] Add database and table info of Canal / Debezium json
format for Kafka sink (#3461)
---
.../docs/connectors/pipeline-connectors/kafka.md | 41 +++++++++++++++
.../docs/connectors/pipeline-connectors/kafka.md | 41 +++++++++++++++
.../json/canal/CanalJsonSerializationSchema.java | 25 +++++++--
.../debezium/DebeziumJsonSerializationSchema.java | 21 ++++++--
.../canal/CanalJsonSerializationSchemaTest.java | 8 +--
.../DebeziumJsonSerializationSchemaTest.java | 8 +--
.../connectors/kafka/sink/KafkaDataSinkITCase.java | 60 ++++++++++++++++------
7 files changed, 173 insertions(+), 31 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index 94f859106..aa9dc8879 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -140,6 +140,47 @@ Pipeline 连接器配置项
* 如果配置了 `topic` 参数,所有的消息都会发送到这一个主题。
* 写入 Kafka 的 topic 如果不存在,则会默认创建。
+### 输出格式
+对于不同的内置 `value.format` 选项,输出的格式也是不同的:
+#### debezium-json
+参考 [Debezium
docs](https://debezium.io/documentation/reference/1.9/connectors/mysql.html),
debezium-json 格式会包含 `before`,`after`,`op`,`source` 几个元素, 但是 `ts_ms` 字段并不会包含在
`source` 元素中。
+一个输出的示例是:
+```json
+{
+ "before": null,
+ "after": {
+ "col1": "1",
+ "col2": "1"
+ },
+ "op": "c",
+ "source": {
+ "db": "default_namespace",
+ "table": "table1"
+ }
+}
+```
+
+#### canal-json
+参考 [Canal | Apache
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
canal-json 格式会包含 `old`,`data`,`type`,`database`,`table`,`pkNames` 几个元素, 但是
`ts` 并不会包含在其中。
+一个输出的示例是:
+```json
+{
+ "old": null,
+ "data": [
+ {
+ "col1": "1",
+ "col2": "1"
+ }
+ ],
+ "type": "INSERT",
+ "database": "default_schema",
+ "table": "table1",
+ "pkNames": [
+ "col1"
+ ]
+}
+```
+
数据类型映射
----------------
<div class="wy-table-responsive">
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 96f599509..57f690666 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -138,6 +138,47 @@ Usage Notes
* If the written topic of Kafka is not existed, we will create one
automatically.
+### Output Format
+For different built-in `value.format` options, the output format is different:
+#### debezium-json
+Refer to [Debezium
docs](https://debezium.io/documentation/reference/1.9/connectors/mysql.html),
debezium-json format will contains `before`,`after`,`op`,`source` elements, but
`ts_ms` is not included in `source`.
+An output example is:
+```json
+{
+ "before": null,
+ "after": {
+ "col1": "1",
+ "col2": "1"
+ },
+ "op": "c",
+ "source": {
+ "db": "default_namespace",
+ "table": "table1"
+ }
+}
+```
+
+#### canal-json
+Refer to [Canal | Apache
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
canal-json format will contains
`old`,`data`,`type`,`database`,`table`,`pkNames` elements, but `ts` is not
included.
+An output example is:
+```json
+{
+ "old": null,
+ "data": [
+ {
+ "col1": "1",
+ "col2": "1"
+ }
+ ],
+ "type": "INSERT",
+ "database": "default_schema",
+ "table": "table1",
+ "pkNames": [
+ "col1"
+ ]
+}
+```
+
Data Type Mapping
----------------
<div class="wy-table-responsive">
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index 78548e31b..0a145cab7 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -94,7 +94,7 @@ public class CanalJsonSerializationSchema implements
SerializationSchema<Event>
@Override
public void open(InitializationContext context) {
this.context = context;
- reuseGenericRowData = new GenericRowData(3);
+ reuseGenericRowData = new GenericRowData(6);
}
@Override
@@ -132,6 +132,17 @@ public class CanalJsonSerializationSchema implements
SerializationSchema<Event>
}
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+ reuseGenericRowData.setField(
+ 3,
StringData.fromString(dataChangeEvent.tableId().getSchemaName()));
+ reuseGenericRowData.setField(
+ 4,
StringData.fromString(dataChangeEvent.tableId().getTableName()));
+ reuseGenericRowData.setField(
+ 5,
+ new GenericArrayData(
+
jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys()
+ .stream()
+ .map(StringData::fromString)
+ .toArray()));
try {
switch (dataChangeEvent.op()) {
case INSERT:
@@ -200,14 +211,20 @@ public class CanalJsonSerializationSchema implements
SerializationSchema<Event>
}
}
+ /**
+ * Refer to <a
+ *
href="https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata">Canal
+ * | Apache Flink</a> for more details.
+ */
private static RowType createJsonRowType(DataType databaseSchema) {
- // Canal JSON contains other information, e.g. "database", "ts"
- // but we don't need them
return (RowType)
DataTypes.ROW(
DataTypes.FIELD("old",
DataTypes.ARRAY(databaseSchema)),
DataTypes.FIELD("data",
DataTypes.ARRAY(databaseSchema)),
- DataTypes.FIELD("type", DataTypes.STRING()))
+ DataTypes.FIELD("type", DataTypes.STRING()),
+ DataTypes.FIELD("database",
DataTypes.STRING()),
+ DataTypes.FIELD("table", DataTypes.STRING()),
+ DataTypes.FIELD("pkNames",
DataTypes.ARRAY(DataTypes.STRING())))
.getLogicalType();
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index 2f305ce42..ce8afc0db 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -92,7 +92,7 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
@Override
public void open(InitializationContext context) {
- reuseGenericRowData = new GenericRowData(3);
+ reuseGenericRowData = new GenericRowData(4);
this.context = context;
}
@@ -131,6 +131,11 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
}
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+ reuseGenericRowData.setField(
+ 3,
+ GenericRowData.of(
+
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+
StringData.fromString(dataChangeEvent.tableId().getTableName())));
try {
switch (dataChangeEvent.op()) {
case INSERT:
@@ -185,14 +190,22 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
}
}
+ /**
+ * Refer to <a
+ *
href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html">Debezium
+ * docs</a> for more details.
+ */
private static RowType createJsonRowType(DataType databaseSchema) {
- // Debezium JSON contains some other information, e.g. "source",
"ts_ms"
- // but we don't need them.
return (RowType)
DataTypes.ROW(
DataTypes.FIELD("before", databaseSchema),
DataTypes.FIELD("after", databaseSchema),
- DataTypes.FIELD("op", DataTypes.STRING()))
+ DataTypes.FIELD("op", DataTypes.STRING()),
+ DataTypes.FIELD(
+ "source",
+ DataTypes.ROW(
+ DataTypes.FIELD("db",
DataTypes.STRING()),
+ DataTypes.FIELD("table",
DataTypes.STRING()))))
.getLogicalType();
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
index c6335e6d6..362354c6e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
@@ -83,7 +83,7 @@ public class CanalJsonSerializationSchemaTest {
}));
JsonNode expected =
mapper.readTree(
-
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}");
+
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
JsonNode actual =
mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
@@ -97,7 +97,7 @@ public class CanalJsonSerializationSchemaTest {
}));
expected =
mapper.readTree(
-
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}");
+
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);
@@ -111,7 +111,7 @@ public class CanalJsonSerializationSchemaTest {
}));
expected =
mapper.readTree(
-
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\"}");
+
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);
@@ -130,7 +130,7 @@ public class CanalJsonSerializationSchemaTest {
}));
expected =
mapper.readTree(
-
"{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\"}");
+
"{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 0be02b9b3..f2b36e4ef 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -81,7 +81,7 @@ public class DebeziumJsonSerializationSchemaTest {
}));
JsonNode expected =
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}");
+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
JsonNode actual =
mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
DataChangeEvent insertEvent2 =
@@ -94,7 +94,7 @@ public class DebeziumJsonSerializationSchemaTest {
}));
expected =
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}");
+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);
DataChangeEvent deleteEvent =
@@ -107,7 +107,7 @@ public class DebeziumJsonSerializationSchemaTest {
}));
expected =
mapper.readTree(
-
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\"}");
+
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);
DataChangeEvent updateEvent =
@@ -125,7 +125,7 @@ public class DebeziumJsonSerializationSchemaTest {
}));
expected =
mapper.readTree(
-
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\"}");
+
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 2e2656596..1ef10603b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -268,15 +268,25 @@ class KafkaDataSinkITCase extends TestLogger {
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"),
+ String.format(
+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"),
+ String.format(
+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"),
+ String.format(
+
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"),
+ String.format(
+
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}"));
+ String.format(
+
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
@@ -330,15 +340,25 @@ class KafkaDataSinkITCase extends TestLogger {
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
-
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"),
+ String.format(
+
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"),
+ String.format(
+
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\"}"),
+ String.format(
+
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\"}"),
+ String.format(
+
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\"}"));
+ String.format(
+
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+ table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
@@ -416,15 +436,25 @@ class KafkaDataSinkITCase extends TestLogger {
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"),
+ String.format(
+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"),
+ String.format(
+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"),
+ String.format(
+
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"),
+ String.format(
+
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())),
mapper.readTree(
-
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}"));
+ String.format(
+
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+ table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}