This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 93555a7d55d [FLINK-16627][json] Support ignore null fields when
serializing into JSON
93555a7d55d is described below
commit 93555a7d55dc27571122cccb7c4d8af2c5db54cb
Author: zhouyisha <[email protected]>
AuthorDate: Fri Mar 1 17:11:32 2024 +0800
[FLINK-16627][json] Support ignore null fields when serializing into JSON
Close apache/flink#24430
---
.../docs/connectors/table/formats/debezium.md | 7 ++
.../docs/connectors/table/formats/json.md | 7 ++
.../docs/connectors/table/formats/maxwell.md | 7 ++
.../docs/connectors/table/formats/ogg.md | 9 ++-
.../docs/connectors/table/formats/debezium.md | 7 ++
docs/content/docs/connectors/table/formats/json.md | 8 +++
.../docs/connectors/table/formats/maxwell.md | 7 ++
docs/content/docs/connectors/table/formats/ogg.md | 7 ++
.../hive/util/ThriftObjectConversions.java | 2 +-
.../flink/formats/json/JsonFormatFactory.java | 7 +-
.../flink/formats/json/JsonFormatOptions.java | 7 ++
.../json/JsonRowDataSerializationSchema.java | 21 ++++--
.../formats/json/RowDataToJsonConverters.java | 15 ++--
.../formats/json/canal/CanalJsonFormatFactory.java | 7 +-
.../json/canal/CanalJsonSerializationSchema.java | 6 +-
.../json/debezium/DebeziumJsonFormatFactory.java | 6 +-
.../debezium/DebeziumJsonSerializationSchema.java | 6 +-
.../json/maxwell/MaxwellJsonFormatFactory.java | 7 +-
.../maxwell/MaxwellJsonSerializationSchema.java | 6 +-
.../formats/json/ogg/OggJsonFormatFactory.java | 7 +-
.../json/ogg/OggJsonSerializationSchema.java | 6 +-
.../flink/formats/json/JsonFormatFactoryTest.java | 2 +
.../formats/json/JsonRowDataSerDeSchemaTest.java | 84 +++++++++++++++++++---
.../json/canal/CanalJsonFormatFactoryTest.java | 3 +
.../json/canal/CanalJsonSerDeSchemaTest.java | 3 +-
.../debezium/DebeziumJsonFormatFactoryTest.java | 2 +
.../json/debezium/DebeziumJsonSerDeSchemaTest.java | 3 +-
.../json/maxwell/MaxwellJsonFormatFactoryTest.java | 2 +
.../json/maxwell/MaxwellJsonSerDerTest.java | 3 +-
.../formats/json/ogg/OggJsonFormatFactoryTest.java | 2 +
.../formats/json/ogg/OggJsonSerDeSchemaTest.java | 3 +-
.../gateway/rest/serde/ResultInfoSerializer.java | 5 +-
32 files changed, 236 insertions(+), 38 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md
b/docs/content.zh/docs/connectors/table/formats/debezium.md
index a6ac486f0f0..7ba62dd408d 100644
--- a/docs/content.zh/docs/connectors/table/formats/debezium.md
+++ b/docs/content.zh/docs/connectors/table/formats/debezium.md
@@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种
format 来
<td>Boolean</td>
<td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为
<code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
</tr>
+ <tr>
+ <td><h5>debezium-json.encode.ignore-null-fields</h5></td>
+ <td>选填</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content.zh/docs/connectors/table/formats/json.md
b/docs/content.zh/docs/connectors/table/formats/json.md
index f1acdd7a001..005485a7a0a 100644
--- a/docs/content.zh/docs/connectors/table/formats/json.md
+++ b/docs/content.zh/docs/connectors/table/formats/json.md
@@ -135,6 +135,13 @@ Format 参数
<td>Boolean</td>
<td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为
<code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
</tr>
+ <tr>
+ <td><h5>json.encode.ignore-null-fields</h5></td>
+ <td>选填</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
+ </tr>
<tr>
<td><h5>decode.json-parser.enabled</h5></td>
<td>选填</td>
diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md
b/docs/content.zh/docs/connectors/table/formats/maxwell.md
index a3ac161f231..0bdedeac682 100644
--- a/docs/content.zh/docs/connectors/table/formats/maxwell.md
+++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific
notations. By default, decimals may be written using scientific notation. For
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default,
and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
+ <tr>
+ <td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Encode only non-null fields. By default, all fields will be
included.</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md
b/docs/content.zh/docs/connectors/table/formats/ogg.md
index c8e8a7a6c6d..61ec97b60fd 100644
--- a/docs/content.zh/docs/connectors/table/formats/ogg.md
+++ b/docs/content.zh/docs/connectors/table/formats/ogg.md
@@ -216,7 +216,7 @@ Format Options
<td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。</td>
</tr>
<tr>
- <td><h5>debezium-json.timestamp-format.standard</h5></td>
+ <td><h5>ogg-json.timestamp-format.standard</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
@@ -247,6 +247,13 @@ Format Options
<td>String</td>
<td>当 <code>'ogg-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换
Map 中的空 key 值。</td>
</tr>
+ <tr>
+ <td><h5>ogg-json.encode.ignore-null-fields</h5></td>
+ <td>选填</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/formats/debezium.md
b/docs/content/docs/connectors/table/formats/debezium.md
index 790196e2588..f69e3dc5d8f 100644
--- a/docs/content/docs/connectors/table/formats/debezium.md
+++ b/docs/content/docs/connectors/table/formats/debezium.md
@@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium
Avro messages and for
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific
notations. By default, decimals may be written using scientific notation. For
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default,
and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
+ <tr>
+ <td><h5>debezium-json.encode.ignore-null-fields</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Encode only non-null fields. By default, all fields will be
included.</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/formats/json.md
b/docs/content/docs/connectors/table/formats/json.md
index 52345a42ea1..64592ac28be 100644
--- a/docs/content/docs/connectors/table/formats/json.md
+++ b/docs/content/docs/connectors/table/formats/json.md
@@ -146,6 +146,14 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific
notations. By default, decimals may be written using scientific notation. For
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default,
and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
+ <tr>
+ <td><h5>json.encode.ignore-null-fields</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Encode only non-null fields. By default, all fields will be
included.</td>
+ </tr>
<tr>
<td><h5>decode.json-parser.enabled</h5></td>
<td>optional</td>
diff --git a/docs/content/docs/connectors/table/formats/maxwell.md
b/docs/content/docs/connectors/table/formats/maxwell.md
index a7a98270f38..47c87442c73 100644
--- a/docs/content/docs/connectors/table/formats/maxwell.md
+++ b/docs/content/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific
notations. By default, decimals may be written using scientific notation. For
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default,
and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
+ <tr>
+ <td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Encode only non-null fields. By default, all fields will be
included.</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/formats/ogg.md
b/docs/content/docs/connectors/table/formats/ogg.md
index 482273af8ce..3b53916e36d 100644
--- a/docs/content/docs/connectors/table/formats/ogg.md
+++ b/docs/content/docs/connectors/table/formats/ogg.md
@@ -260,6 +260,13 @@ Format Options
<td>String</td>
<td>Specify string literal to replace null key when
<code>'ogg-json.map-null-key.mode'</code> is LITERAL.</td>
</tr>
+ <tr>
+ <td><h5>ogg-json.encode.ignore-null-fields</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Encode only non-null fields. By default, all fields will be
included.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 35a40ea187f..fbc7d89d6ad 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -108,7 +108,7 @@ public class ThriftObjectConversions {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
- TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
+ TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false);
private static final Map<String, TableKind> TABLE_TYPE_MAPPINGS =
buildTableTypeMapping();
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 562b99e6bc8..6ecc9a0f086 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -46,6 +46,7 @@ import java.util.Set;
import static
org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED;
import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static
org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
import static
org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
import static
org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
@@ -147,6 +148,7 @@ public class JsonFormatFactory implements
DeserializationFormatFactory, Serializ
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
@@ -158,7 +160,8 @@ public class JsonFormatFactory implements
DeserializationFormatFactory, Serializ
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
@Override
@@ -187,6 +190,7 @@ public class JsonFormatFactory implements
DeserializationFormatFactory, Serializ
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
@@ -197,6 +201,7 @@ public class JsonFormatFactory implements
DeserializationFormatFactory, Serializ
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
index 5c9e61068ac..cc40b325d91 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
@@ -73,6 +73,13 @@ public class JsonFormatOptions {
.withDescription(
"Optional flag to specify whether to encode all
decimals as plain numbers instead of possible scientific notations, false by
default.");
+ public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS =
+ ConfigOptions.key("encode.ignore-null-fields")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to specify whether to ignore null
fields when encoding, false by default.");
+
public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED =
ConfigOptions.key("decode.json-parser.enabled")
.booleanType()
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 376d0d568a3..4b68bb0c2af 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;
+ /** Flag indicating whether to ignore null fields. */
+ private final boolean ignoreNullFields;
+
public JsonRowDataSerializationSchema(
RowType rowType,
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
this.rowType = rowType;
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ this.ignoreNullFields = ignoreNullFields;
this.runtimeConverter =
- new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
+ new RowDataToJsonConverters(
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ ignoreNullFields)
.createConverter(rowType);
}
@@ -95,7 +104,7 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
@Override
public byte[] serialize(RowData row) {
- if (node == null) {
+ if (node == null || ignoreNullFields) {
node = mapper.createObjectNode();
}
@@ -120,7 +129,8 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
&& timestampFormat.equals(that.timestampFormat)
&& mapNullKeyMode.equals(that.mapNullKeyMode)
&& mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
- && encodeDecimalAsPlainNumber ==
that.encodeDecimalAsPlainNumber;
+ && encodeDecimalAsPlainNumber ==
that.encodeDecimalAsPlainNumber
+ && ignoreNullFields == that.ignoreNullFields;
}
@Override
@@ -130,6 +140,7 @@ public class JsonRowDataSerializationSchema implements
SerializationSchema<RowDa
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index 2a1cd076ecf..7ca5f59d183 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -68,13 +68,18 @@ public class RowDataToJsonConverters implements
Serializable {
/** The string literal when handling mode for map null key LITERAL. is */
private final String mapNullKeyLiteral;
+ /** Flag indicating whether to ignore null fields. */
+ private final boolean ignoreNullFields;
+
public RowDataToJsonConverters(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral) {
+ String mapNullKeyLiteral,
+ boolean ignoreNullFields) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.ignoreNullFields = ignoreNullFields;
}
/**
@@ -331,9 +336,11 @@ public class RowDataToJsonConverters implements
Serializable {
String fieldName = fieldNames[i];
try {
Object field = fieldGetters[i].getFieldOrNull(row);
- node.set(
- fieldName,
- fieldConverters[i].convert(mapper,
node.get(fieldName), field));
+ if (field != null || !ignoreNullFields) {
+ node.set(
+ fieldName,
+ fieldConverters[i].convert(mapper,
node.get(fieldName), field));
+ }
} catch (Throwable t) {
throw new RuntimeException(
String.format("Fail to serialize at field: %s.",
fieldName), t);
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index 9f3ccad2d01..77dfefa9505 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.Set;
import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static
org.apache.flink.formats.json.canal.CanalJsonFormatOptions.DATABASE_INCLUDE;
import static
org.apache.flink.formats.json.canal.CanalJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static
org.apache.flink.formats.json.canal.CanalJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
@@ -91,6 +92,8 @@ public class CanalJsonFormatFactory
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
@@ -111,7 +114,8 @@ public class CanalJsonFormatFactory
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -136,6 +140,7 @@ public class CanalJsonFormatFactory
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
index 362b9df6e6a..aaa292ef9df 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
@@ -59,14 +59,16 @@ public class CanalJsonSerializationSchema implements
SerializationSchema<RowData
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
@Override
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index d72fcd23deb..7fec2f43c1e 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
import java.util.Set;
import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -92,6 +93,7 @@ public class DebeziumJsonFormatFactory
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
return new EncodingFormat<SerializationSchema<RowData>>() {
@@ -114,7 +116,8 @@ public class DebeziumJsonFormatFactory
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -138,6 +141,7 @@ public class DebeziumJsonFormatFactory
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
index 0dc9a96b012..7312b30593a 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
@@ -56,14 +56,16 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<RowD
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
@Override
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
index 1bbbec84414..e56966753a2 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.Set;
import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -86,6 +87,8 @@ public class MaxwellJsonFormatFactory
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
@@ -107,7 +110,8 @@ public class MaxwellJsonFormatFactory
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -130,6 +134,7 @@ public class MaxwellJsonFormatFactory
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
index 1fe567b08c3..ad1accdddd6 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
@@ -56,14 +56,16 @@ public class MaxwellJsonSerializationSchema implements
SerializationSchema<RowDa
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
this.jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
this.timestampFormat = timestampFormat;
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
index f853983d43e..11182e93806 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.Set;
import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static
org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static
org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static
org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -99,6 +100,8 @@ public class OggJsonFormatFactory
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
@@ -120,7 +123,8 @@ public class OggJsonFormatFactory
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
};
}
@@ -143,6 +147,7 @@ public class OggJsonFormatFactory
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
index 635ff3dc7e3..f44387a5863 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
@@ -57,14 +57,16 @@ public class OggJsonSerializationSchema implements
SerializationSchema<RowData>
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
private static RowType createJsonRowType(DataType databaseSchema) {
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index 3559e2b2c87..4430203b28e 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -176,6 +176,7 @@ class JsonFormatFactoryTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
SerializationSchema<RowData> actualSer =
@@ -227,6 +228,7 @@ class JsonFormatFactoryTest {
options.put("json.map-null-key.mode", "LITERAL");
options.put("json.map-null-key.literal", "null");
options.put("json.encode.decimal-as-plain-number", "true");
+ options.put("json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index ced449e0936..916b04f50f8 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -216,7 +216,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
byte[] actualBytes = serializationSchema.serialize(rowData);
@@ -300,7 +301,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
// the first row
@@ -381,7 +383,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
for (int i = 0; i < jsons.length; i++) {
@@ -496,7 +499,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
ObjectNode root = OBJECT_MAPPER.createObjectNode();
@@ -538,7 +542,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.FAIL,
"null",
- true);
+ true,
+ false);
open(serializationSchema1);
// expect message for serializationSchema1
String errorMessage1 =
@@ -551,7 +556,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.DROP,
"null",
- true);
+ true,
+ false);
open(serializationSchema2);
// expect result for serializationSchema2
String expectResult2 = "{\"nestedMap\":{\"no-null key\":{\"no-null
key\":1}}}";
@@ -562,7 +568,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"nullKey",
- true);
+ true,
+ false);
open(serializationSchema3);
// expect result for serializationSchema3
String expectResult3 =
@@ -601,7 +608,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
plainDecimalSerializer.open(new DummyInitializationContext());
JsonRowDataSerializationSchema scientificDecimalSerializer =
new JsonRowDataSerializationSchema(
@@ -609,6 +617,7 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ false,
false);
scientificDecimalSerializer.open(new DummyInitializationContext());
@@ -626,6 +635,62 @@ public class JsonRowDataSerDeSchemaTest {
assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson);
}
+ @TestTemplate
+ void testSerDeMultiRowsWithNullValuesIgnored() throws Exception {
+ String[] jsons =
+ new String[] {
+
"{\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+
"{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\",
\"svt\":\"2020-02-24T12:58:09.209+0800\"}, "
+ + "\"ids\":[1, 2, 3]}",
+ "{\"ops\":{\"id\":null,
\"svt\":\"2020-02-24T12:58:09.209+0800\"}, "
+ + "\"ids\":[1, 2, null]}",
+ "{\"ops\":{},\"ids\":[],\"metrics\":{}}",
+ };
+
+ String[] expected =
+ new String[] {
+ "{\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+
"{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\",\"svt\":\"2020-02-24T12:58:09.209+0800\"},"
+ + "\"ids\":[1,2,3]}",
+
"{\"ops\":{\"svt\":\"2020-02-24T12:58:09.209+0800\"},\"ids\":[1,2,null]}",
+ "{\"ops\":{},\"ids\":[],\"metrics\":{}}",
+ };
+
+ RowType rowType =
+ (RowType)
+ ROW(
+ FIELD(
+ "ops",
+ ROW(FIELD("id", STRING()),
FIELD("svt", STRING()))),
+ FIELD("ids", ARRAY(INT())),
+ FIELD("metrics", MAP(STRING(),
DOUBLE())))
+ .getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema =
+ new JsonRowDataDeserializationSchema(
+ rowType,
+ InternalTypeInfo.of(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601);
+ open(deserializationSchema);
+ JsonRowDataSerializationSchema serializationSchema =
+ new JsonRowDataSerializationSchema(
+ rowType,
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ false,
+ true);
+ open(serializationSchema);
+ for (int i = 0; i < jsons.length; i++) {
+ String json = jsons[i];
+ RowData row = deserializationSchema.deserialize(json.getBytes());
+ String result = new String(serializationSchema.serialize(row));
+ assertThat(result).isEqualTo(expected[i]);
+ }
+ }
+
@TestTemplate
void testJsonParse() throws Exception {
for (TestSpec spec : testData) {
@@ -648,7 +713,8 @@ public class JsonRowDataSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.FAIL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
String errorMessage = "Fail to serialize at field: f1.";
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index 00bd5a0625e..bf2b95dac98 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -74,6 +74,7 @@ class CanalJsonFormatFactoryTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.FAIL,
"null",
+ false,
false);
SerializationSchema<RowData> actualSer =
createSerializationSchema(options);
assertThat(actualSer).isEqualTo(expectedSer);
@@ -89,6 +90,7 @@ class CanalJsonFormatFactoryTest {
options.put("canal-json.map-null-key.mode", "LITERAL");
options.put("canal-json.map-null-key.literal", "nullKey");
options.put("canal-json.encode.decimal-as-plain-number", "true");
+ options.put("canal-json.encode.ignore-null-fields", "true");
// test Deser
CanalJsonDeserializationSchema expectedDeser =
@@ -109,6 +111,7 @@ class CanalJsonFormatFactoryTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"nullKey",
+ true,
true);
SerializationSchema<RowData> actualSer =
createSerializationSchema(options);
assertThat(actualSer).isEqualTo(expectedSer);
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
index e45bfcc5eee..cf326f2a339 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
@@ -218,7 +218,8 @@ class CanalJsonSerDeSchemaTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
serializationSchema.open(new DummyInitializationContext());
List<String> result = new ArrayList<>();
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index d000877b2f9..c469e0b2f95 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -81,6 +81,7 @@ class DebeziumJsonFormatFactoryTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
@@ -200,6 +201,7 @@ class DebeziumJsonFormatFactoryTest {
options.put("debezium-json.map-null-key.mode", "LITERAL");
options.put("debezium-json.map-null-key.literal", "null");
options.put("debezium-json.encode.decimal-as-plain-number", "true");
+ options.put("debezium-json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 3b9151f33a9..ffe0007f522 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -249,7 +249,8 @@ class DebeziumJsonSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
actual = new ArrayList<>();
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
index bc47d1e68f0..54fe0804a5b 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
@@ -69,6 +69,7 @@ class MaxwellJsonFormatFactoryTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
final Map<String, String> options = getAllOptions();
@@ -165,6 +166,7 @@ class MaxwellJsonFormatFactoryTest {
options.put("maxwell-json.map-null-key.mode", "LITERAL");
options.put("maxwell-json.map-null-key.literal", "null");
options.put("maxwell-json.encode.decimal-as-plain-number", "true");
+ options.put("maxwell-json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
index 12d64fd99d0..d17d6a83534 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
@@ -187,7 +187,8 @@ class MaxwellJsonSerDerTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
List<String> result = new ArrayList<>();
for (RowData rowData : collector.list) {
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
index c04e991a2de..f840783ca95 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
@@ -55,6 +55,7 @@ class OggJsonFormatFactoryTest {
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
+ true,
true);
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
@@ -137,6 +138,7 @@ class OggJsonFormatFactoryTest {
options.put("ogg-json.map-null-key.mode", "LITERAL");
options.put("ogg-json.map-null-key.literal", "null");
options.put("ogg-json.encode.decimal-as-plain-number", "true");
+ options.put("ogg-json.encode.ignore-null-fields", "true");
return options;
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
index 2fa78c89412..76e417d4ac1 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -216,7 +216,8 @@ class OggJsonSerDeSchemaTest {
TimestampFormat.SQL,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
- true);
+ true,
+ false);
open(serializationSchema);
actual = new ArrayList<>();
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
index b796d32ba3e..fb43f6d62b8 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
@@ -68,7 +68,10 @@ public class ResultInfoSerializer extends
StdSerializer<ResultInfo> {
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
- TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ false);
@Override
public void serialize(