wuchong commented on a change in pull request #13972:
URL: https://github.com/apache/flink/pull/13972#discussion_r519546625



##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
##########
@@ -36,14 +40,28 @@
                        .key("fail-on-missing-field")
                        .booleanType()
                        .defaultValue(false)
-                       .withDescription("Optional flag to specify whether to 
fail if a field is missing or not, false by default");
+                       .withDescription("Optional flag to specify whether to 
fail if a field is missing or not, false by default.");
 
        public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
                        .key("ignore-parse-errors")
                        .booleanType()
                        .defaultValue(false)
                        .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
-                                       + "fields are set to null in case of 
errors, false by default");
+                                       + "fields are set to null in case of 
errors, false by default.");
+
+       public static final ConfigOption<String> MAP_NULL_KEY_MODE = 
ConfigOptions
+               .key("map-null-key.mode")
+               .stringType()
+               .defaultValue("FAIL")
+               .withDescription("Optional flag to control the handling mode 
when serializing null key for map data, FAIL by default."
+                       + " Option DROP will drop null key entries for map 
data."
+                       + " Option LITERAL will use 'map-null-key.literal' as 
key literal.");
+
+       public static final ConfigOption<String> MAP_NULL_KEY_LITERAL = 
ConfigOptions
+               .key("map-null-key.literal")
+               .stringType()
+               .defaultValue("null")
+               .withDescription("Optional flag to specify string literal when 
'map-null-key.literal' is LITERAL, \"null\" by default.");

Review comment:
       ```suggestion
                .withDescription("Optional flag to specify string literal for 
null keys when 'map-null-key.mode' is LITERAL, \"null\" by default.");
   ```

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
##########
@@ -81,4 +110,89 @@ public static TimestampFormat 
getTimestampFormat(ReadableConfig config){
                                        String.format("Unsupported timestamp 
format '%s'. Validator should have checked that.", timestampFormat));
                }
        }
+
+       /**
+        * Creates handling mode for null key map data.
+        *
+        * <p>See {@link #JSON_MAP_NULL_KEY_MODE_FAIL}, {@link 
#JSON_MAP_NULL_KEY_MODE_DROP},
+        * and {@link #JSON_MAP_NULL_KEY_MODE_LITERAL} for more information.
+        */
+       public static MapNullKeyMode getMapNullKeyMode(ReadableConfig config){
+               String mapNullKeyMode = config.get(MAP_NULL_KEY_MODE);
+               switch (mapNullKeyMode.toUpperCase()){
+                       case JSON_MAP_NULL_KEY_MODE_FAIL:
+                               return MapNullKeyMode.FAIL;
+                       case JSON_MAP_NULL_KEY_MODE_DROP:
+                               return MapNullKeyMode.DROP;
+                       case JSON_MAP_NULL_KEY_MODE_LITERAL:
+                               return MapNullKeyMode.LITERAL;
+                       default:
+                               throw new TableException(
+                                       String.format("Unsupported map null key 
handling mode '%s'. Validator should have checked that.", mapNullKeyMode));
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Inner classes
+       // 
--------------------------------------------------------------------------------------------
+
+       /** Handling mode for map data with null key. */
+       public enum MapNullKeyMode {
+               FAIL,
+               DROP,
+               LITERAL
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Validation
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Validator for decoding format.
+        */
+       public static void validateDecodingFormatOptions(ReadableConfig 
tableOptions) {
+               boolean failOnMissingField = 
tableOptions.get(FAIL_ON_MISSING_FIELD);
+               boolean ignoreParseErrors = 
tableOptions.get(IGNORE_PARSE_ERRORS);
+               if (ignoreParseErrors && failOnMissingField) {
+                       throw new 
ValidationException(FAIL_ON_MISSING_FIELD.key()
+                               + " and "
+                               + IGNORE_PARSE_ERRORS.key()
+                               + " shouldn't both be true.");
+               }
+               validateTimestampFormat(tableOptions);
+       }
+
+       /**
+        * Validator for encoding format.
+        */
+       public static void validateEncodingFormatOptions(ReadableConfig 
tableOptions) {
+               String mapNullKeyMode = tableOptions.get(MAP_NULL_KEY_MODE);
+               // validator for {@link MAP_NULL_KEY_MODE}
+               if 
(!JSON_MAP_NULL_KEY_MODE_ENUM.contains(mapNullKeyMode.toUpperCase())){
+                       throw new ValidationException(
+                               String.format("Unsupported value '%s' for %s. 
Supported values are [FAIL, DROP, LITERAL].",
+                                       mapNullKeyMode, 
MAP_NULL_KEY_MODE.key()));
+               }

Review comment:
       We don't need `JSON_MAP_NULL_KEY_MODE_ENUM`.
   
   ```suggestion
                Set<String> nullKeyModes = 
Arrays.stream(MapNullKeyMode.values())
                        .map(Objects::toString)
                        .collect(Collectors.toSet());
                if 
(!nullKeyModes.contains(tableOptions.get(MAP_NULL_KEY_MODE).toUpperCase())){
                        throw new ValidationException(String.format(
                                "Unsupported value '%s' for option %s. 
Supported values are %s.",
                                mapNullKeyMode,
                                MAP_NULL_KEY_MODE.key(),
                                nullKeyModes));
                }
   ```

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
##########
@@ -250,7 +261,26 @@ private RowDataToJsonConverter createMapConverter(
                        ArrayData valueArray = map.valueArray();
                        int numElements = map.size();
                        for (int i = 0; i < numElements; i++) {
-                               String fieldName = 
keyArray.getString(i).toString(); // key must be string
+                               boolean isFieldNameNull = keyArray.isNullAt(i);
+                               String fieldName = null;
+                               if (isFieldNameNull) {

Review comment:
       Can simplify invoking `keyArray.isNullAt(i)` here.  

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
##########
@@ -250,7 +261,26 @@ private RowDataToJsonConverter createMapConverter(
                        ArrayData valueArray = map.valueArray();
                        int numElements = map.size();
                        for (int i = 0; i < numElements; i++) {
-                               String fieldName = 
keyArray.getString(i).toString(); // key must be string
+                               boolean isFieldNameNull = keyArray.isNullAt(i);
+                               String fieldName = null;
+                               if (isFieldNameNull) {
+                                       // when map key is null
+                                       switch (mapNullKeyMode) {
+                                               case LITERAL:
+                                                       fieldName = 
mapNullKeyLiteral;
+                                                       break;
+                                               case DROP:
+                                                       continue;
+                                               case FAIL:
+                                                       throw new 
RuntimeException("Map key is null, please have a check."
+                                                               + " You can 
setup null key handling mode to drop entry or replace with a no-null literal.");

Review comment:
       ```suggestion
                                                        throw new 
RuntimeException(String.format(
                                                                "JSON format 
doesn't support to serialize map data with null keys. " +
                                                                        "You 
can drop null key entries or encode null in literals by specifying %s option.",
                                                                
JsonOptions.MAP_NULL_KEY_LITERAL.key()));
   ```

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
##########
@@ -96,16 +98,21 @@ public ChangelogMode getChangelogMode() {
                        DynamicTableFactory.Context context,
                        ReadableConfig formatOptions) {
                FactoryUtil.validateFactoryOptions(this, formatOptions);
+               validateEncodingFormatOptions(formatOptions);
 
                TimestampFormat timestampOption = 
JsonOptions.getTimestampFormat(formatOptions);
+               JsonOptions.MapNullKeyMode mapNullKeyMode = 
JsonOptions.getMapNullKeyMode(formatOptions);
+               String mapNullKeyLiteral = 
formatOptions.get(MAP_NULL_KEY_LITERAL);
 
                return new EncodingFormat<SerializationSchema<RowData>>() {
                        @Override
                        public SerializationSchema<RowData> 
createRuntimeEncoder(
                                        DynamicTableSink.Context context,
                                        DataType consumedDataType) {
                                final RowType rowType = (RowType) 
consumedDataType.getLogicalType();
-                               return new 
JsonRowDataSerializationSchema(rowType, timestampOption);
+                               return new JsonRowDataSerializationSchema(
+                                               rowType, timestampOption,
+                                               mapNullKeyMode, 
mapNullKeyLiteral);

Review comment:
       Separate parameters into a new line. 

##########
File path: docs/dev/table/connectors/formats/json.md
##########
@@ -117,6 +117,26 @@ Format Options
       </ul>
       </td>
     </tr>
+    <tr>
+      <td><h5>json.map-null-key.mode</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;"><code>'FAIL'</code></td>
+      <td>String</td>
+      <td>Specify the handling mode when serializing null keys for map data. 
Currently supported values are <code>'FAIL'</code>, <code>'DROP'</code> and 
<code>'LITERAL'</code>:
+      <ul>
+        <li>Option <code>'FAIL'</code> will throw exception when encountering 
map with null key.</li>
+        <li>Option <code>'DROP'</code> will drop null key entries for map 
data.</li>
+        <li>Option <code>'LITERAL'</code> will replace null key with string 
literal.</li>

Review comment:
       ```suggestion
           <li>Option <code>'LITERAL'</code> will replace null key with string 
literal. The string literal is defined by 
<code>json.map-null-key.literal</code> option.</li>
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to