wuchong commented on a change in pull request #13333:
URL: https://github.com/apache/flink/pull/13333#discussion_r485428642



##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
##########
@@ -58,14 +58,20 @@ public DebeziumJsonSerializationSchema(RowType rowType, 
TimestampFormat timestam
 
        @Override
        public void open(InitializationContext context) {
-               genericRowData = new GenericRowData(2);
+               genericRowData = new GenericRowData(3);
        }
 
        @Override
        public byte[] serialize(RowData rowData) {
                try {
-                       genericRowData.setField(0, rowData);
-                       genericRowData.setField(1, 
rowKind2String(rowData.getRowKind()));
+                       if (RowKind.INSERT == rowData.getRowKind()) {

Review comment:
       We should also consider `UPDATE_AFTER` and `UPDATE_BEFORE `.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to